Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id AAEAD200BFA for ; Thu, 12 Jan 2017 21:17:29 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id A9B9A160B40; Thu, 12 Jan 2017 20:17:29 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 81B66160B29 for ; Thu, 12 Jan 2017 21:17:28 +0100 (CET) Received: (qmail 83225 invoked by uid 500); 12 Jan 2017 20:17:22 -0000 Mailing-List: contact commits-help@beam.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@beam.apache.org Delivered-To: mailing list commits@beam.apache.org Received: (qmail 83210 invoked by uid 99); 12 Jan 2017 20:17:22 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 12 Jan 2017 20:17:22 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 9E3D6DFA9D; Thu, 12 Jan 2017 20:17:22 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: robertwb@apache.org To: commits@beam.apache.org Date: Thu, 12 Jan 2017 20:17:22 -0000 Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: [1/3] beam git commit: [BEAM-1188] Python File Verifer For E2E Tests archived-at: Thu, 12 Jan 2017 20:17:29 -0000 Repository: beam Updated Branches: refs/heads/python-sdk 4ba0b60a8 -> e23c3cab3 [BEAM-1188] Python File Verifer For E2E Tests Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/feca7cfe Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/feca7cfe Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/feca7cfe Branch: refs/heads/python-sdk Commit: feca7cfe45a33f46f55da23368a7ab601d307b62 Parents: 4ba0b60 Author: Mark Liu Authored: Mon Jan 9 23:48:42 2017 -0800 Committer: Robert Bradshaw Committed: Thu Jan 12 12:17:07 2017 -0800 ---------------------------------------------------------------------- .../apache_beam/examples/wordcount_it_test.py | 23 ++++-- .../apache_beam/io/datastore/v1/helper_test.py | 31 +-------- sdks/python/apache_beam/test_pipeline.py | 67 +++++++++++------- sdks/python/apache_beam/test_pipeline_test.py | 73 ++++++++++++++++---- .../apache_beam/tests/pipeline_verifiers.py | 67 ++++++++++++++++++ .../tests/pipeline_verifiers_test.py | 65 +++++++++++++++-- sdks/python/apache_beam/tests/test_utils.py | 47 +++++++++++++ 7 files changed, 298 insertions(+), 75 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/feca7cfe/sdks/python/apache_beam/examples/wordcount_it_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/examples/wordcount_it_test.py b/sdks/python/apache_beam/examples/wordcount_it_test.py index b74e075..bf63688 100644 --- a/sdks/python/apache_beam/examples/wordcount_it_test.py +++ b/sdks/python/apache_beam/examples/wordcount_it_test.py @@ -20,23 +20,38 @@ import logging import unittest +from datetime import datetime as dt +from hamcrest.core.core.allof import all_of +from nose.plugins.attrib import attr + from apache_beam.examples import wordcount from apache_beam.test_pipeline import TestPipeline from apache_beam.tests.pipeline_verifiers import PipelineStateMatcher -from nose.plugins.attrib import attr +from apache_beam.tests.pipeline_verifiers import FileChecksumMatcher class WordCountIT(unittest.TestCase): + DEFAULT_CHECKSUM = 'c780e9466b8635af1d11b74bbd35233a82908a02' + @attr('IT') def test_wordcount_it(self): + test_pipeline = TestPipeline(is_integration_test=True) + # Set extra options to the pipeline for test purpose - extra_opts = {'on_success_matcher': PipelineStateMatcher()} + output = '/'.join([test_pipeline.get_option('output'), + dt.now().strftime('py-wordcount-%Y-%m-%d-%H-%M-%S'), + 'results']) + pipeline_verifiers = [PipelineStateMatcher(), + FileChecksumMatcher(output + '*-of-*', + self.DEFAULT_CHECKSUM)] + extra_opts = {'output': output, + 'on_success_matcher': all_of(*pipeline_verifiers)} # Get pipeline options from command argument: --test-pipeline-options, # and start pipeline job by calling pipeline main function. - test_pipeline = TestPipeline(is_integration_test=True) - wordcount.run(test_pipeline.get_test_option_args(**extra_opts)) + wordcount.run(test_pipeline.get_full_options_as_args(**extra_opts)) + if __name__ == '__main__': logging.getLogger().setLevel(logging.DEBUG) http://git-wip-us.apache.org/repos/asf/beam/blob/feca7cfe/sdks/python/apache_beam/io/datastore/v1/helper_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/io/datastore/v1/helper_test.py b/sdks/python/apache_beam/io/datastore/v1/helper_test.py index 6f45993..051d150 100644 --- a/sdks/python/apache_beam/io/datastore/v1/helper_test.py +++ b/sdks/python/apache_beam/io/datastore/v1/helper_test.py @@ -16,7 +16,6 @@ # """Tests for datastore helper.""" -import imp import sys import unittest @@ -26,11 +25,11 @@ from google.datastore.v1 import query_pb2 from google.datastore.v1.entity_pb2 import Key from googledatastore.connection import RPCError from googledatastore import helper as datastore_helper -from mock import MagicMock, Mock, patch +from mock import MagicMock from apache_beam.io.datastore.v1 import fake_datastore from apache_beam.io.datastore.v1 import helper -from apache_beam.utils import retry +from apache_beam.tests.test_utils import patch_retry class HelperTest(unittest.TestCase): @@ -39,31 +38,7 @@ class HelperTest(unittest.TestCase): self._mock_datastore = MagicMock() self._query = query_pb2.Query() self._query.kind.add().name = 'dummy_kind' - self.patch_retry() - - def patch_retry(self): - - """A function to patch retry module to use mock clock and logger.""" - real_retry_with_exponential_backoff = retry.with_exponential_backoff - - def patched_retry_with_exponential_backoff(num_retries, retry_filter): - """A patch for retry decorator to use a mock dummy clock and logger.""" - return real_retry_with_exponential_backoff( - num_retries=num_retries, retry_filter=retry_filter, logger=Mock(), - clock=Mock()) - - patch.object(retry, 'with_exponential_backoff', - side_effect=patched_retry_with_exponential_backoff).start() - - # Reload module after patching. - imp.reload(helper) - - def kill_patches(): - patch.stopall() - # Reload module again after removing patch. - imp.reload(helper) - - self.addCleanup(kill_patches) + patch_retry(self, helper) def permanent_datastore_failure(self, req): raise RPCError("dummy", 500, "failed") http://git-wip-us.apache.org/repos/asf/beam/blob/feca7cfe/sdks/python/apache_beam/test_pipeline.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/test_pipeline.py b/sdks/python/apache_beam/test_pipeline.py index 48b98b2..69040d1 100644 --- a/sdks/python/apache_beam/test_pipeline.py +++ b/sdks/python/apache_beam/test_pipeline.py @@ -78,34 +78,18 @@ class TestPipeline(Pipeline): expected type. """ self.is_integration_test = is_integration_test + self.options_list = self._parse_test_option_args(argv) if options is None: - options = PipelineOptions(self.get_test_option_args(argv)) + options = PipelineOptions(self.options_list) super(TestPipeline, self).__init__(runner, options) - def _append_extra_opts(self, opt_list, extra_opts): - """Append extra pipeline options to existing option list. - - Test verifier (if contains) should be pickled before append, and - will be unpickled later in TestRunner. - """ - for k, v in extra_opts.items(): - if not v: - continue - elif isinstance(v, bool) and v: - opt_list.append('--%s' % k) - elif 'matcher' in k: - opt_list.append('--%s=%s' % (k, pickler.dumps(v))) - else: - opt_list.append('--%s=%s' % (k, v)) - - def get_test_option_args(self, argv=None, **kwargs): - """Get pipeline options as argument list by parsing value of command line - argument: --test-pipeline-options combined with given extra options. + def _parse_test_option_args(self, argv): + """Parse value of command line argument: --test-pipeline-options to get + pipeline options. Args: argv: An iterable of command line arguments to be used. If not specified then sys.argv will be used as input for parsing arguments. - kwargs: Extra pipeline options for the test. Returns: An argument list of options that can be parsed by argparser or directly @@ -125,10 +109,43 @@ class TestPipeline(Pipeline): raise SkipTest('IT is skipped because --test-pipeline-options ' 'is not specified') - options_list = shlex.split(known.test_pipeline_options) \ + return shlex.split(known.test_pipeline_options) \ if known.test_pipeline_options else [] - if kwargs: - self._append_extra_opts(options_list, kwargs) + def get_full_options_as_args(self, **extra_opts): + """Get full pipeline options as an argument list. + + Append extra pipeline options to existing option list if provided. + Test verifier (if contains in extra options) should be pickled before + append, and will be unpickled later in TestRunner. + """ + options = list(self.options_list) + for k, v in extra_opts.items(): + if not v: + continue + elif isinstance(v, bool) and v: + options.append('--%s' % k) + elif 'matcher' in k: + options.append('--%s=%s' % (k, pickler.dumps(v))) + else: + options.append('--%s=%s' % (k, v)) + return options + + def get_option(self, opt_name): + """Get a pipeline option value by name - return options_list + Args: + opt_name: The name of the pipeline option. + + Returns: + None if option is not found in existing option list which is generated + by parsing value of argument `test-pipeline-options`. + """ + parser = argparse.ArgumentParser() + opt_name = opt_name[:2] if opt_name[:2] == '--' else opt_name + # Option name should start with '--' when it's used for parsing. + parser.add_argument('--' + opt_name, + type=str, + action='store') + known, _ = parser.parse_known_args(self.options_list) + return getattr(known, opt_name) if hasattr(known, opt_name) else None http://git-wip-us.apache.org/repos/asf/beam/blob/feca7cfe/sdks/python/apache_beam/test_pipeline_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/test_pipeline_test.py b/sdks/python/apache_beam/test_pipeline_test.py index 42ba2d7..41b935c 100644 --- a/sdks/python/apache_beam/test_pipeline_test.py +++ b/sdks/python/apache_beam/test_pipeline_test.py @@ -17,12 +17,24 @@ """Unit test for the TestPipeline class""" +import logging import unittest +import mock +from hamcrest.core.base_matcher import BaseMatcher +from hamcrest.core.assert_that import assert_that as hc_assert_that + +from apache_beam.internal import pickler from apache_beam.test_pipeline import TestPipeline from apache_beam.utils.pipeline_options import PipelineOptions +# A simple matcher that is ued for testing extra options appending. +class SimpleMatcher(BaseMatcher): + def _matches(self, item): + return True + + class TestPipelineTest(unittest.TestCase): TEST_CASE = {'options': @@ -32,9 +44,6 @@ class TestPipelineTest(unittest.TestCase): 'male': True, 'age': 1}} - def setUp(self): - self.pipeline = TestPipeline() - # Used for testing pipeline option creation. class TestParsingOptions(PipelineOptions): @@ -45,19 +54,57 @@ class TestPipelineTest(unittest.TestCase): parser.add_argument('--age', action='store', type=int, help='mock age') def test_option_args_parsing(self): + test_pipeline = TestPipeline(argv=self.TEST_CASE['options']) self.assertListEqual( - self.pipeline.get_test_option_args(argv=self.TEST_CASE['options']), + test_pipeline.get_full_options_as_args(), self.TEST_CASE['expected_list']) + def test_empty_option_args_parsing(self): + test_pipeline = TestPipeline() + self.assertListEqual([], + test_pipeline.get_full_options_as_args()) + def test_create_test_pipeline_options(self): - test_options = PipelineOptions( - self.pipeline.get_test_option_args(self.TEST_CASE['options'])) - self.assertDictContainsSubset( - self.TEST_CASE['expected_dict'], test_options.get_all_options()) + test_pipeline = TestPipeline(argv=self.TEST_CASE['options']) + test_options = PipelineOptions(test_pipeline.get_full_options_as_args()) + self.assertDictContainsSubset(self.TEST_CASE['expected_dict'], + test_options.get_all_options()) + + EXTRA_OPT_CASES = [ + {'options': {'name': 'Mark'}, + 'expected': ['--name=Mark']}, + {'options': {'student': True}, + 'expected': ['--student']}, + {'options': {'student': False}, + 'expected': []} + ] def test_append_extra_options(self): - extra_opt = {'name': 'Mark'} - options_list = self.pipeline.get_test_option_args( - argv=self.TEST_CASE['options'], **extra_opt) - expected_list = self.TEST_CASE['expected_list'] + ['--name=Mark'] - self.assertListEqual(expected_list, options_list) + test_pipeline = TestPipeline() + for case in self.EXTRA_OPT_CASES: + opt_list = test_pipeline.get_full_options_as_args(**case['options']) + self.assertListEqual(opt_list, case['expected']) + + def test_append_verifier_in_extra_opt(self): + extra_opt = {'matcher': SimpleMatcher()} + opt_list = TestPipeline().get_full_options_as_args(**extra_opt) + matcher = pickler.loads(opt_list[0].split('=')[1]) + self.assertTrue(isinstance(matcher, BaseMatcher)) + hc_assert_that(mock.Mock(), matcher) + + def test_get_option(self): + name, value = ('job', 'mockJob') + test_pipeline = TestPipeline() + test_pipeline.options_list = ['--%s=%s' % (name, value)] + self.assertEqual(test_pipeline.get_option(name), value) + + def test_skip_IT(self): + test_pipeline = TestPipeline(is_integration_test=True) + test_pipeline.run() + # Note that this will never be reached since it should be skipped above. + self.fail() + + +if __name__ == '__main__': + logging.getLogger().setLevel(logging.INFO) + unittest.main() http://git-wip-us.apache.org/repos/asf/beam/blob/feca7cfe/sdks/python/apache_beam/tests/pipeline_verifiers.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/tests/pipeline_verifiers.py b/sdks/python/apache_beam/tests/pipeline_verifiers.py index 1a6dd45..efcfbdf 100644 --- a/sdks/python/apache_beam/tests/pipeline_verifiers.py +++ b/sdks/python/apache_beam/tests/pipeline_verifiers.py @@ -22,9 +22,16 @@ of test pipeline job. Customized verifier should extend `hamcrest.core.base_matcher.BaseMatcher` and override _matches. """ +import hashlib +import logging + +from apache_beam.io.fileio import TextFileSource from apache_beam.runners.runner import PipelineState +from apache_beam.utils import retry from hamcrest.core.base_matcher import BaseMatcher +MAX_RETRIES = 4 + class PipelineStateMatcher(BaseMatcher): """Matcher that verify pipeline job terminated in expected state @@ -48,3 +55,63 @@ class PipelineStateMatcher(BaseMatcher): mismatch_description \ .append_text("Test pipeline job terminated in state: ") \ .append_text(pipeline_result.current_state()) + + +def retry_on_fileio_error(exception): + """Filter allowing retries on file I/O errors.""" + if isinstance(exception, RuntimeError) or \ + isinstance(exception, IOError): + # GCS I/O raises RuntimeError and local filesystem I/O + # raises IOError when file reading is failed. + return True + else: + return False + + +class FileChecksumMatcher(BaseMatcher): + """Matcher that verifies file(s) content by comparing file checksum. + + Use fileio to fetch file(s) from given path. Currently, fileio supports + local filesystem and GCS. + + File checksum is a SHA-1 hash computed from content of file(s). + """ + + def __init__(self, file_path, expected_checksum): + self.file_path = file_path + self.expected_checksum = expected_checksum + + @retry.with_exponential_backoff(num_retries=MAX_RETRIES, + retry_filter=retry_on_fileio_error) + def _read_with_retry(self): + """Read path with retry if I/O failed""" + source = TextFileSource(self.file_path) + read_lines = [] + with source.reader() as reader: + for line in reader: + read_lines.append(line) + return read_lines + + def _matches(self, _): + # Read from given file(s) path + read_lines = self._read_with_retry() + + # Compute checksum + read_lines.sort() + m = hashlib.new('sha1') + for line in read_lines: + m.update(line) + self.checksum, num_lines = (m.hexdigest(), len(read_lines)) + logging.info('Read from given path %s, %d lines, checksum: %s.', + self.file_path, num_lines, self.checksum) + return self.checksum == self.expected_checksum + + def describe_to(self, description): + description \ + .append_text("Expected checksum is ") \ + .append_text(self.expected_checksum) + + def describe_mismatch(self, pipeline_result, mismatch_description): + mismatch_description \ + .append_text("Actual checksum is ") \ + .append_text(self.checksum) http://git-wip-us.apache.org/repos/asf/beam/blob/feca7cfe/sdks/python/apache_beam/tests/pipeline_verifiers_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/tests/pipeline_verifiers_test.py b/sdks/python/apache_beam/tests/pipeline_verifiers_test.py index 9f70b45..91dedad 100644 --- a/sdks/python/apache_beam/tests/pipeline_verifiers_test.py +++ b/sdks/python/apache_beam/tests/pipeline_verifiers_test.py @@ -18,27 +18,37 @@ """Unit tests for the test pipeline verifiers""" import logging +import tempfile import unittest +from hamcrest import assert_that as hc_assert_that +from mock import Mock, patch + +from apache_beam.io.fileio import TextFileSource from apache_beam.runners.runner import PipelineState from apache_beam.runners.runner import PipelineResult -from apache_beam.tests.pipeline_verifiers import PipelineStateMatcher -from hamcrest import assert_that as hc_assert_that +from apache_beam.tests import pipeline_verifiers as verifiers +from apache_beam.tests.test_utils import patch_retry class PipelineVerifiersTest(unittest.TestCase): + def setUp(self): + self._mock_result = Mock() + patch_retry(self, verifiers) + def test_pipeline_state_matcher_success(self): """Test PipelineStateMatcher successes when using default expected state and job actually finished in DONE """ pipeline_result = PipelineResult(PipelineState.DONE) - hc_assert_that(pipeline_result, PipelineStateMatcher()) + hc_assert_that(pipeline_result, verifiers.PipelineStateMatcher()) def test_pipeline_state_matcher_given_state(self): """Test PipelineStateMatcher successes when matches given state""" pipeline_result = PipelineResult(PipelineState.FAILED) - hc_assert_that(pipeline_result, PipelineStateMatcher(PipelineState.FAILED)) + hc_assert_that(pipeline_result, + verifiers.PipelineStateMatcher(PipelineState.FAILED)) def test_pipeline_state_matcher_fails(self): """Test PipelineStateMatcher fails when using default expected state @@ -53,7 +63,52 @@ class PipelineVerifiersTest(unittest.TestCase): for state in failed_state: pipeline_result = PipelineResult(state) with self.assertRaises(AssertionError): - hc_assert_that(pipeline_result, PipelineStateMatcher()) + hc_assert_that(pipeline_result, verifiers.PipelineStateMatcher()) + + test_cases = [ + {'content': 'Test FileChecksumMatcher with single file', + 'num_files': 1, + 'expected_checksum': 'ebe16840cc1d0b4fe1cf71743e9d772fa31683b8'}, + {'content': 'Test FileChecksumMatcher with multiple files', + 'num_files': 3, + 'expected_checksum': '58b3d3636de3891ac61afb8ace3b5025c3c37d44'}, + {'content': '', + 'num_files': 1, + 'expected_checksum': 'da39a3ee5e6b4b0d3255bfef95601890afd80709'}, + ] + + def create_temp_file(self, content, directory=None): + with tempfile.NamedTemporaryFile(delete=False, dir=directory) as f: + f.write(content) + return f.name + + def test_file_checksum_matcher_success(self): + for case in self.test_cases: + temp_dir = tempfile.mkdtemp() + for _ in range(case['num_files']): + self.create_temp_file(case['content'], temp_dir) + matcher = verifiers.FileChecksumMatcher(temp_dir + '/*', + case['expected_checksum']) + hc_assert_that(self._mock_result, matcher) + + @patch.object(TextFileSource, 'reader') + def test_file_checksum_matcher_read_failed(self, mock_reader): + mock_reader.side_effect = IOError('No file found.') + matcher = verifiers.FileChecksumMatcher('dummy/path', Mock()) + with self.assertRaises(IOError): + hc_assert_that(self._mock_result, matcher) + self.assertTrue(mock_reader.called) + self.assertEqual(verifiers.MAX_RETRIES + 1, mock_reader.call_count) + + @patch.object(TextFileSource, 'reader') + def test_file_checksum_matcher_service_error(self, mock_reader): + mock_reader.side_effect = RuntimeError('GCS service failed.') + matcher = verifiers.FileChecksumMatcher('dummy/path', Mock()) + with self.assertRaises(RuntimeError): + hc_assert_that(self._mock_result, matcher) + self.assertTrue(mock_reader.called) + self.assertEqual(verifiers.MAX_RETRIES + 1, mock_reader.call_count) + if __name__ == '__main__': logging.getLogger().setLevel(logging.INFO) http://git-wip-us.apache.org/repos/asf/beam/blob/feca7cfe/sdks/python/apache_beam/tests/test_utils.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/tests/test_utils.py b/sdks/python/apache_beam/tests/test_utils.py new file mode 100644 index 0000000..fc99fe9 --- /dev/null +++ b/sdks/python/apache_beam/tests/test_utils.py @@ -0,0 +1,47 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""Utility methods for testing""" + +import imp +from mock import Mock, patch + +from apache_beam.utils import retry + + +def patch_retry(cls, module): + """A function to patch retry module to use mock clock and logger.""" + real_retry_with_exponential_backoff = retry.with_exponential_backoff + + def patched_retry_with_exponential_backoff(num_retries, retry_filter): + """A patch for retry decorator to use a mock dummy clock and logger.""" + return real_retry_with_exponential_backoff( + num_retries=num_retries, retry_filter=retry_filter, logger=Mock(), + clock=Mock()) + + patch.object(retry, 'with_exponential_backoff', + side_effect=patched_retry_with_exponential_backoff).start() + + # Reload module after patching. + imp.reload(module) + + def kill_patches(): + patch.stopall() + # Reload module again after removing patch. + imp.reload(module) + + cls.addCleanup(kill_patches)