beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From al...@apache.org
Subject [1/3] beam git commit: [BEAM-2236] Move test utilities out of python core
Date Wed, 10 May 2017 01:17:06 GMT
Repository: beam
Updated Branches:
  refs/heads/master 78a4b8e4f -> e2be6961d


http://git-wip-us.apache.org/repos/asf/beam/blob/c028349c/sdks/python/apache_beam/tests/data/standard_coders.yaml
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/tests/data/standard_coders.yaml b/sdks/python/apache_beam/tests/data/standard_coders.yaml
deleted file mode 100644
index 790cacb..0000000
--- a/sdks/python/apache_beam/tests/data/standard_coders.yaml
+++ /dev/null
@@ -1,196 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-# This file is broken into multiple sections delimited by ---. Each section specifies a set
of
-# reference encodings for a single standardized coder used in a specific context.
-#
-# Each section contains up to 3 properties:
-#
-#   coder: a common coder spec. Currently, a URN and URNs for component coders as necessary.
-#   nested: a boolean meaning whether the coder was used in the nested context. Missing means
to
-#           test both contexts, a shorthand for when the coder is invariant across context.
-#   examples: a map of {encoded bytes: original JSON object} encoded with the coder in the
context.
-#             The LHS (key) is a byte array encoded as a JSON-escaped string. The RHS (value)
is
-#             one of a few standard JSON types such as numbers, strings, dicts that map naturally
-#             to the type encoded by the coder.
-#
-# These choices were made to strike a balance between portability, ease of use, and simple
-# legibility of this file itself.
-#
-# It is expected that future work will move the `coder` field into a format that it would
be
-# represented by the Runner API, so that it can be understood by all SDKs and harnesses.
-#
-# If a coder is marked non-deterministic in the coder spec, then only the decoding should
be validated.
-
-
-
-coder:
-  urn: "urn:beam:coders:bytes:0.1"
-nested: false
-examples:
-  "abc": abc
-  "ab\0c": "ab\0c"
-
----
-
-coder:
-  urn: "urn:beam:coders:bytes:0.1"
-nested: true
-examples:
-  "\u0003abc": abc
-  "\u0004ab\0c": "ab\0c"
-  "\u00c8\u0001       10|       20|       30|       40|       50|       60|       70|   
   80|       90|      100|      110|      120|      130|      140|      150|      160|   
  170|      180|      190|      200|":
-              "       10|       20|       30|       40|       50|       60|       70|   
   80|       90|      100|      110|      120|      130|      140|      150|      160|   
  170|      180|      190|      200|"
-
----
-
-coder:
-  urn: "urn:beam:coders:varint:0.1"
-examples:
-  "\0": 0
-  "\u0001": 1
-  "\u000A": 10
-  "\u00c8\u0001": 200
-  "\u00e8\u0007": 1000
-  "\u00ff\u00ff\u00ff\u00ff\u00ff\u00ff\u00ff\u00ff\u00ff\u0001": -1
-
----
-
-coder:
-  urn: "urn:beam:coders:kv:0.1"
-  components: [{urn: "urn:beam:coders:bytes:0.1"},
-               {urn: "urn:beam:coders:varint:0.1"}]
-examples:
-  "\u0003abc\0": {key: abc, value: 0}
-  "\u0004ab\0c\u000A": {key: "ab\0c", value: 10}
-
----
-
-coder:
-  urn: "urn:beam:coders:kv:0.1"
-  components: [{urn: "urn:beam:coders:bytes:0.1"},
-               {urn: "urn:beam:coders:bytes:0.1"}]
-nested: false
-examples:
-  "\u0003abcdef": {key: abc, value: def}
-  "\u0004ab\0cde\0f": {key: "ab\0c", value: "de\0f"}
-
----
-
-coder:
-  urn: "urn:beam:coders:kv:0.1"
-  components: [{urn: "urn:beam:coders:bytes:0.1"},
-               {urn: "urn:beam:coders:bytes:0.1"}]
-nested: true
-examples:
-  "\u0003abc\u0003def": {key: abc, value: def}
-  "\u0004ab\0c\u0004de\0f": {key: "ab\0c", value: "de\0f"}
-
----
-
-coder:
-  urn: "urn:beam:coders:interval_window:0.1"
-examples:
-  "\u0080\u0000\u0001\u0052\u009a\u00a4\u009b\u0068\u0080\u00dd\u00db\u0001" : {end: 1454293425000,
span: 3600000}
-  "\u0080\u0000\u0001\u0053\u0034\u00ec\u0074\u00e8\u0080\u0090\u00fb\u00d3\u0009" : {end:
1456881825000, span: 2592000000}
-  "\u007f\u00df\u003b\u0064\u005a\u001c\u00ad\u0076\u00ed\u0002" : {end: -9223372036854410,
span: 365}
-  "\u0080\u0020\u00c4\u009b\u00a5\u00e3\u0053\u00f7\u0000" : {end: 9223372036854775, span:
0}
-
----
-
-coder:
-  urn: "urn:beam:coders:stream:0.1"
-  components: [{urn: "urn:beam:coders:varint:0.1"}]
-examples:
-  "\0\0\0\u0001\0": [0]
-  "\0\0\0\u0004\u0001\n\u00c8\u0001\u00e8\u0007": [1, 10, 200, 1000]
-  "\0\0\0\0": []
-
----
-
-coder:
-  urn: "urn:beam:coders:stream:0.1"
-  components: [{urn: "urn:beam:coders:bytes:0.1"}]
-examples:
-  "\0\0\0\u0001\u0003abc": ["abc"]
-  "\0\0\0\u0002\u0004ab\0c\u0004de\0f": ["ab\0c", "de\0f"]
-  "\0\0\0\0": []
-
----
-
-coder:
-  urn: "urn:beam:coders:stream:0.1"
-  components: [{urn: "urn:beam:coders:bytes:0.1"}]
-  # This is for iterables of unknown length, where the encoding is not
-  # deterministic.
-  non_deterministic: True
-examples:
-  "\u00ff\u00ff\u00ff\u00ff\u0000": []
-  "\u00ff\u00ff\u00ff\u00ff\u0001\u0003abc\u0000": ["abc"]
-  "\u00ff\u00ff\u00ff\u00ff\u0002\u0004ab\u0000c\u0004de\u0000f\u0000": ["ab\0c", "de\0f"]
-
----
-
-coder:
-  urn: "urn:beam:coders:stream:0.1"
-  components: [{urn: "urn:beam:coders:global_window:0.1"}]
-examples:
-  "\0\0\0\u0001": [""]
-
----
-
-coder:
-  urn: "urn:beam:coders:global_window:0.1"
-examples:
-  "": ""
-
----
-
-# All windowed values consist of pane infos that represent NO_FIRING until full support is
added
-# in the Python SDK (BEAM-1522).
-coder:
-  urn: "urn:beam:coders:windowed_value:0.1"
-  components: [{urn: "urn:beam:coders:varint:0.1"},
-               {urn: "urn:beam:coders:global_window:0.1"}]
-examples:
-  "\u0080\0\u0001R\u009a\u00a4\u009bh\0\0\0\u0001\u000f\u0002": {
-    value: 2,
-    timestamp: 1454293425000,
-    pane: {is_first: True, is_last: True, timing: UNKNOWN, index: 0, on_time_index: 0},
-    windows: ["global"]
-  }
-
----
-
-coder:
-  urn: "urn:beam:coders:windowed_value:0.1"
-  components: [{urn: "urn:beam:coders:varint:0.1"},
-               {urn: "urn:beam:coders:interval_window:0.1"}]
-examples:
-  "\u007f\u00ff\u00ff\u00ff\u00ff\u00f9\u00e5\u0080\0\0\0\u0001\u0080\0\u0001R\u009a\u00a4\u009bh\u00c0\u008b\u0011\u000f\u0004":
{
-    value: 4,
-    timestamp: -400000,
-    pane: {is_first: True, is_last: True, timing: UNKNOWN, index: 0, on_time_index: 0},
-    windows: [{end: 1454293425000, span: 280000}]
-  }
-
-  "\u007f\u00ff\u00ff\u00ff\u00ff\u00ff\u00ff\u009c\0\0\0\u0002\u0080\0\u0001R\u009a\u00a4\u009bh\u0080\u00dd\u00db\u0001\u007f\u00df;dZ\u001c\u00adv\u00ed\u0002\u000f\u0002":
{
-    value: 2,
-    timestamp: -100,
-    pane: {is_first: True, is_last: True, timing: UNKNOWN, index: 0, on_time_index: 0},
-    windows: [{end: 1454293425000, span: 3600000}, {end: -9223372036854410, span: 365}]
-  }

http://git-wip-us.apache.org/repos/asf/beam/blob/c028349c/sdks/python/apache_beam/tests/pipeline_verifiers.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/tests/pipeline_verifiers.py b/sdks/python/apache_beam/tests/pipeline_verifiers.py
deleted file mode 100644
index df05054..0000000
--- a/sdks/python/apache_beam/tests/pipeline_verifiers.py
+++ /dev/null
@@ -1,146 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-"""End-to-end test result verifiers
-
-A set of verifiers that are used in end-to-end tests to verify state/output
-of test pipeline job. Customized verifier should extend
-`hamcrest.core.base_matcher.BaseMatcher` and override _matches.
-"""
-
-import logging
-import time
-
-from hamcrest.core.base_matcher import BaseMatcher
-
-from apache_beam.io.filesystems import FileSystems
-from apache_beam.runners.runner import PipelineState
-from apache_beam.tests import test_utils as utils
-from apache_beam.utils import retry
-
-try:
-  from apitools.base.py.exceptions import HttpError
-except ImportError:
-  HttpError = None
-
-MAX_RETRIES = 4
-
-
-class PipelineStateMatcher(BaseMatcher):
-  """Matcher that verify pipeline job terminated in expected state
-
-  Matcher compares the actual pipeline terminate state with expected.
-  By default, `PipelineState.DONE` is used as expected state.
-  """
-
-  def __init__(self, expected_state=PipelineState.DONE):
-    self.expected_state = expected_state
-
-  def _matches(self, pipeline_result):
-    return pipeline_result.state == self.expected_state
-
-  def describe_to(self, description):
-    description \
-      .append_text("Test pipeline expected terminated in state: ") \
-      .append_text(self.expected_state)
-
-  def describe_mismatch(self, pipeline_result, mismatch_description):
-    mismatch_description \
-      .append_text("Test pipeline job terminated in state: ") \
-      .append_text(pipeline_result.state)
-
-
-def retry_on_io_error_and_server_error(exception):
-  """Filter allowing retries on file I/O errors and service error."""
-  return isinstance(exception, IOError) or \
-          (HttpError is not None and isinstance(exception, HttpError))
-
-
-class FileChecksumMatcher(BaseMatcher):
-  """Matcher that verifies file(s) content by comparing file checksum.
-
-  Use apache_beam.io.fileio to fetch file(s) from given path. File checksum
-  is a hash string computed from content of file(s).
-  """
-
-  def __init__(self, file_path, expected_checksum, sleep_secs=None):
-    """Initialize a FileChecksumMatcher object
-
-    Args:
-      file_path : A string that is the full path of output file. This path
-        can contain globs.
-      expected_checksum : A hash string that is computed from expected
-        result.
-      sleep_secs : Number of seconds to wait before verification start.
-        Extra time are given to make sure output files are ready on FS.
-    """
-    if sleep_secs is not None:
-      if isinstance(sleep_secs, int):
-        self.sleep_secs = sleep_secs
-      else:
-        raise ValueError('Sleep seconds, if received, must be int. '
-                         'But received: %r, %s' % (sleep_secs,
-                                                   type(sleep_secs)))
-    else:
-      self.sleep_secs = None
-
-    self.file_path = file_path
-    self.expected_checksum = expected_checksum
-
-  @retry.with_exponential_backoff(
-      num_retries=MAX_RETRIES,
-      retry_filter=retry_on_io_error_and_server_error)
-  def _read_with_retry(self):
-    """Read path with retry if I/O failed"""
-    read_lines = []
-    match_result = FileSystems.match([self.file_path])[0]
-    matched_path = [f.path for f in match_result.metadata_list]
-    if not matched_path:
-      raise IOError('No such file or directory: %s' % self.file_path)
-
-    logging.info('Find %d files in %s: \n%s',
-                 len(matched_path), self.file_path, '\n'.join(matched_path))
-    for path in matched_path:
-      with FileSystems.open(path, 'r') as f:
-        for line in f:
-          read_lines.append(line)
-    return read_lines
-
-  def _matches(self, _):
-    if self.sleep_secs:
-      # Wait to have output file ready on FS
-      logging.info('Wait %d seconds...', self.sleep_secs)
-      time.sleep(self.sleep_secs)
-
-    # Read from given file(s) path
-    read_lines = self._read_with_retry()
-
-    # Compute checksum
-    self.checksum = utils.compute_hash(read_lines)
-    logging.info('Read from given path %s, %d lines, checksum: %s.',
-                 self.file_path, len(read_lines), self.checksum)
-    return self.checksum == self.expected_checksum
-
-  def describe_to(self, description):
-    description \
-      .append_text("Expected checksum is ") \
-      .append_text(self.expected_checksum)
-
-  def describe_mismatch(self, pipeline_result, mismatch_description):
-    mismatch_description \
-      .append_text("Actual checksum is ") \
-      .append_text(self.checksum)

http://git-wip-us.apache.org/repos/asf/beam/blob/c028349c/sdks/python/apache_beam/tests/pipeline_verifiers_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/tests/pipeline_verifiers_test.py b/sdks/python/apache_beam/tests/pipeline_verifiers_test.py
deleted file mode 100644
index 909917d..0000000
--- a/sdks/python/apache_beam/tests/pipeline_verifiers_test.py
+++ /dev/null
@@ -1,148 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-"""Unit tests for the test pipeline verifiers"""
-
-import logging
-import tempfile
-import unittest
-
-from hamcrest import assert_that as hc_assert_that
-from mock import Mock, patch
-
-from apache_beam.io.localfilesystem import LocalFileSystem
-from apache_beam.runners.runner import PipelineState
-from apache_beam.runners.runner import PipelineResult
-from apache_beam.tests import pipeline_verifiers as verifiers
-from apache_beam.tests.test_utils import patch_retry
-
-try:
-  # pylint: disable=wrong-import-order, wrong-import-position
-  # pylint: disable=ungrouped-imports
-  from apitools.base.py.exceptions import HttpError
-  from apache_beam.io.gcp.gcsfilesystem import GCSFileSystem
-except ImportError:
-  HttpError = None
-  GCSFileSystem = None
-
-
-class PipelineVerifiersTest(unittest.TestCase):
-
-  def setUp(self):
-    self._mock_result = Mock()
-    patch_retry(self, verifiers)
-
-  def test_pipeline_state_matcher_success(self):
-    """Test PipelineStateMatcher successes when using default expected state
-    and job actually finished in DONE
-    """
-    pipeline_result = PipelineResult(PipelineState.DONE)
-    hc_assert_that(pipeline_result, verifiers.PipelineStateMatcher())
-
-  def test_pipeline_state_matcher_given_state(self):
-    """Test PipelineStateMatcher successes when matches given state"""
-    pipeline_result = PipelineResult(PipelineState.FAILED)
-    hc_assert_that(pipeline_result,
-                   verifiers.PipelineStateMatcher(PipelineState.FAILED))
-
-  def test_pipeline_state_matcher_fails(self):
-    """Test PipelineStateMatcher fails when using default expected state
-    and job actually finished in CANCELLED/DRAINED/FAILED/STOPPED/UNKNOWN
-    """
-    failed_state = [PipelineState.CANCELLED,
-                    PipelineState.DRAINED,
-                    PipelineState.FAILED,
-                    PipelineState.STOPPED,
-                    PipelineState.UNKNOWN]
-
-    for state in failed_state:
-      pipeline_result = PipelineResult(state)
-      with self.assertRaises(AssertionError):
-        hc_assert_that(pipeline_result, verifiers.PipelineStateMatcher())
-
-  test_cases = [
-      {'content': 'Test FileChecksumMatcher with single file',
-       'num_files': 1,
-       'expected_checksum': 'ebe16840cc1d0b4fe1cf71743e9d772fa31683b8'},
-      {'content': 'Test FileChecksumMatcher with multiple files',
-       'num_files': 3,
-       'expected_checksum': '58b3d3636de3891ac61afb8ace3b5025c3c37d44'},
-      {'content': '',
-       'num_files': 1,
-       'expected_checksum': 'da39a3ee5e6b4b0d3255bfef95601890afd80709'},
-  ]
-
-  def create_temp_file(self, content, directory=None):
-    with tempfile.NamedTemporaryFile(delete=False, dir=directory) as f:
-      f.write(content)
-      return f.name
-
-  def test_file_checksum_matcher_success(self):
-    for case in self.test_cases:
-      temp_dir = tempfile.mkdtemp()
-      for _ in range(case['num_files']):
-        self.create_temp_file(case['content'], temp_dir)
-      matcher = verifiers.FileChecksumMatcher(temp_dir + '/*',
-                                              case['expected_checksum'])
-      hc_assert_that(self._mock_result, matcher)
-
-  @patch.object(LocalFileSystem, 'match')
-  def test_file_checksum_matcher_read_failed(self, mock_match):
-    mock_match.side_effect = IOError('No file found.')
-    matcher = verifiers.FileChecksumMatcher('dummy/path', Mock())
-    with self.assertRaises(IOError):
-      hc_assert_that(self._mock_result, matcher)
-    self.assertTrue(mock_match.called)
-    self.assertEqual(verifiers.MAX_RETRIES + 1, mock_match.call_count)
-
-  @patch.object(GCSFileSystem, 'match')
-  @unittest.skipIf(HttpError is None, 'google-apitools is not installed')
-  def test_file_checksum_matcher_service_error(self, mock_match):
-    mock_match.side_effect = HttpError(
-        response={'status': '404'}, url='', content='Not Found',
-    )
-    matcher = verifiers.FileChecksumMatcher('gs://dummy/path', Mock())
-    with self.assertRaises(HttpError):
-      hc_assert_that(self._mock_result, matcher)
-    self.assertTrue(mock_match.called)
-    self.assertEqual(verifiers.MAX_RETRIES + 1, mock_match.call_count)
-
-  def test_file_checksum_matchcer_invalid_sleep_time(self):
-    with self.assertRaises(ValueError) as cm:
-      verifiers.FileChecksumMatcher('file_path',
-                                    'expected_checksum',
-                                    'invalid_sleep_time')
-    self.assertEqual(cm.exception.message,
-                     'Sleep seconds, if received, must be int. '
-                     'But received: \'invalid_sleep_time\', '
-                     '<type \'str\'>')
-
-  @patch('time.sleep', return_value=None)
-  def test_file_checksum_matcher_sleep_before_verify(self, mocked_sleep):
-    temp_dir = tempfile.mkdtemp()
-    case = self.test_cases[0]
-    self.create_temp_file(case['content'], temp_dir)
-    matcher = verifiers.FileChecksumMatcher(temp_dir + '/*',
-                                            case['expected_checksum'],
-                                            10)
-    hc_assert_that(self._mock_result, matcher)
-    self.assertTrue(mocked_sleep.called)
-
-
-if __name__ == '__main__':
-  logging.getLogger().setLevel(logging.INFO)
-  unittest.main()

http://git-wip-us.apache.org/repos/asf/beam/blob/c028349c/sdks/python/apache_beam/tests/test_utils.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/tests/test_utils.py b/sdks/python/apache_beam/tests/test_utils.py
deleted file mode 100644
index 666207e..0000000
--- a/sdks/python/apache_beam/tests/test_utils.py
+++ /dev/null
@@ -1,69 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-"""Utility methods for testing"""
-
-import hashlib
-import imp
-from mock import Mock, patch
-
-from apache_beam.utils import retry
-
-DEFAULT_HASHING_ALG = 'sha1'
-
-
-def compute_hash(content, hashing_alg=DEFAULT_HASHING_ALG):
-  """Compute a hash value from a list of string."""
-  content.sort()
-  m = hashlib.new(hashing_alg)
-  for elem in content:
-    m.update(str(elem))
-  return m.hexdigest()
-
-
-def patch_retry(testcase, module):
-  """A function to patch retry module to use mock clock and logger.
-
-  Clock and logger that defined in retry decorator will be replaced in test
-  in order to skip sleep phase when retry happens.
-
-  Args:
-    testcase: An instance of unittest.TestCase that calls this function to
-      patch retry module.
-    module: The module that uses retry and need to be replaced with mock
-      clock and logger in test.
-  """
-  real_retry_with_exponential_backoff = retry.with_exponential_backoff
-
-  def patched_retry_with_exponential_backoff(num_retries, retry_filter):
-    """A patch for retry decorator to use a mock dummy clock and logger."""
-    return real_retry_with_exponential_backoff(
-        num_retries=num_retries, retry_filter=retry_filter, logger=Mock(),
-        clock=Mock())
-
-  patch.object(retry, 'with_exponential_backoff',
-               side_effect=patched_retry_with_exponential_backoff).start()
-
-  # Reload module after patching.
-  imp.reload(module)
-
-  def remove_patches():
-    patch.stopall()
-    # Reload module again after removing patch.
-    imp.reload(module)
-
-  testcase.addCleanup(remove_patches)

http://git-wip-us.apache.org/repos/asf/beam/blob/c028349c/sdks/python/apache_beam/transforms/combiners_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/combiners_test.py b/sdks/python/apache_beam/transforms/combiners_test.py
index af76889..1822c19 100644
--- a/sdks/python/apache_beam/transforms/combiners_test.py
+++ b/sdks/python/apache_beam/transforms/combiners_test.py
@@ -22,7 +22,7 @@ import unittest
 import hamcrest as hc
 
 import apache_beam as beam
-from apache_beam.test_pipeline import TestPipeline
+from apache_beam.testing.test_pipeline import TestPipeline
 import apache_beam.transforms.combiners as combine
 from apache_beam.transforms.core import CombineGlobally
 from apache_beam.transforms.core import Create

http://git-wip-us.apache.org/repos/asf/beam/blob/c028349c/sdks/python/apache_beam/transforms/create_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/create_test.py b/sdks/python/apache_beam/transforms/create_test.py
index 2352acd..9ede4c7 100644
--- a/sdks/python/apache_beam/transforms/create_test.py
+++ b/sdks/python/apache_beam/transforms/create_test.py
@@ -22,7 +22,7 @@ from apache_beam.io import source_test_utils
 
 from apache_beam import Create, assert_that, equal_to
 from apache_beam.coders import FastPrimitivesCoder
-from apache_beam.test_pipeline import TestPipeline
+from apache_beam.testing.test_pipeline import TestPipeline
 
 
 class CreateTest(unittest.TestCase):

http://git-wip-us.apache.org/repos/asf/beam/blob/c028349c/sdks/python/apache_beam/transforms/ptransform_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/ptransform_test.py b/sdks/python/apache_beam/transforms/ptransform_test.py
index 5948460..137992d 100644
--- a/sdks/python/apache_beam/transforms/ptransform_test.py
+++ b/sdks/python/apache_beam/transforms/ptransform_test.py
@@ -32,7 +32,7 @@ from apache_beam.metrics.metric import MetricsFilter
 from apache_beam.io.iobase import Read
 from apache_beam.options.pipeline_options import TypeOptions
 import apache_beam.pvalue as pvalue
-from apache_beam.test_pipeline import TestPipeline
+from apache_beam.testing.test_pipeline import TestPipeline
 from apache_beam.transforms import window
 import apache_beam.transforms.combiners as combine
 from apache_beam.transforms.display import DisplayData, DisplayDataItem

http://git-wip-us.apache.org/repos/asf/beam/blob/c028349c/sdks/python/apache_beam/transforms/sideinputs_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/sideinputs_test.py b/sdks/python/apache_beam/transforms/sideinputs_test.py
index bf9aeff..0bc9107 100644
--- a/sdks/python/apache_beam/transforms/sideinputs_test.py
+++ b/sdks/python/apache_beam/transforms/sideinputs_test.py
@@ -23,7 +23,7 @@ import unittest
 from nose.plugins.attrib import attr
 
 import apache_beam as beam
-from apache_beam.test_pipeline import TestPipeline
+from apache_beam.testing.test_pipeline import TestPipeline
 from apache_beam.transforms import window
 from apache_beam.transforms.util import assert_that, equal_to
 

http://git-wip-us.apache.org/repos/asf/beam/blob/c028349c/sdks/python/apache_beam/transforms/trigger_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/trigger_test.py b/sdks/python/apache_beam/transforms/trigger_test.py
index c08a791..da7f43b 100644
--- a/sdks/python/apache_beam/transforms/trigger_test.py
+++ b/sdks/python/apache_beam/transforms/trigger_test.py
@@ -26,7 +26,7 @@ import yaml
 
 import apache_beam as beam
 from apache_beam.runners import pipeline_context
-from apache_beam.test_pipeline import TestPipeline
+from apache_beam.testing.test_pipeline import TestPipeline
 from apache_beam.transforms import trigger
 from apache_beam.transforms.core import Windowing
 from apache_beam.transforms.trigger import AccumulationMode

http://git-wip-us.apache.org/repos/asf/beam/blob/c028349c/sdks/python/apache_beam/transforms/util_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/util_test.py b/sdks/python/apache_beam/transforms/util_test.py
index 9656827..7fdef70 100644
--- a/sdks/python/apache_beam/transforms/util_test.py
+++ b/sdks/python/apache_beam/transforms/util_test.py
@@ -20,7 +20,7 @@
 import unittest
 
 from apache_beam import Create
-from apache_beam.test_pipeline import TestPipeline
+from apache_beam.testing.test_pipeline import TestPipeline
 from apache_beam.transforms.util import assert_that, equal_to, is_empty
 
 

http://git-wip-us.apache.org/repos/asf/beam/blob/c028349c/sdks/python/apache_beam/transforms/window_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/window_test.py b/sdks/python/apache_beam/transforms/window_test.py
index 2d2b03d..a7797dd 100644
--- a/sdks/python/apache_beam/transforms/window_test.py
+++ b/sdks/python/apache_beam/transforms/window_test.py
@@ -20,7 +20,7 @@
 import unittest
 
 from apache_beam.runners import pipeline_context
-from apache_beam.test_pipeline import TestPipeline
+from apache_beam.testing.test_pipeline import TestPipeline
 from apache_beam.transforms import CombinePerKey
 from apache_beam.transforms import combiners
 from apache_beam.transforms import core

http://git-wip-us.apache.org/repos/asf/beam/blob/c028349c/sdks/python/apache_beam/transforms/write_ptransform_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/write_ptransform_test.py b/sdks/python/apache_beam/transforms/write_ptransform_test.py
index 3d7fbd9..27e7caa 100644
--- a/sdks/python/apache_beam/transforms/write_ptransform_test.py
+++ b/sdks/python/apache_beam/transforms/write_ptransform_test.py
@@ -22,7 +22,7 @@ import unittest
 import apache_beam as beam
 
 from apache_beam.io import iobase
-from apache_beam.test_pipeline import TestPipeline
+from apache_beam.testing.test_pipeline import TestPipeline
 from apache_beam.transforms.ptransform import PTransform
 from apache_beam.transforms.util import assert_that, is_empty
 

http://git-wip-us.apache.org/repos/asf/beam/blob/c028349c/sdks/python/apache_beam/typehints/typed_pipeline_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/typehints/typed_pipeline_test.py b/sdks/python/apache_beam/typehints/typed_pipeline_test.py
index 3812fb1..3494cfe 100644
--- a/sdks/python/apache_beam/typehints/typed_pipeline_test.py
+++ b/sdks/python/apache_beam/typehints/typed_pipeline_test.py
@@ -23,10 +23,10 @@ import unittest
 import apache_beam as beam
 from apache_beam import pvalue
 from apache_beam import typehints
-from apache_beam.test_pipeline import TestPipeline
+from apache_beam.options.pipeline_options import OptionsContext
+from apache_beam.testing.test_pipeline import TestPipeline
 from apache_beam.transforms.util import assert_that, equal_to
 from apache_beam.typehints import WithTypeHints
-from apache_beam.options.pipeline_options import OptionsContext
 
 # These test often construct a pipeline as value | PTransform to test side
 # effects (e.g. errors).

http://git-wip-us.apache.org/repos/asf/beam/blob/c028349c/sdks/python/apache_beam/utils/test_stream.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/utils/test_stream.py b/sdks/python/apache_beam/utils/test_stream.py
deleted file mode 100644
index 7ae27b7..0000000
--- a/sdks/python/apache_beam/utils/test_stream.py
+++ /dev/null
@@ -1,163 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-"""Provides TestStream for verifying streaming runner semantics."""
-
-from abc import ABCMeta
-from abc import abstractmethod
-
-from apache_beam import coders
-from apache_beam import pvalue
-from apache_beam.transforms import PTransform
-from apache_beam.transforms.window import TimestampedValue
-from apache_beam.utils import timestamp
-from apache_beam.utils.windowed_value import WindowedValue
-
-
-class Event(object):
-  """Test stream event to be emitted during execution of a TestStream."""
-
-  __metaclass__ = ABCMeta
-
-  def __cmp__(self, other):
-    if type(self) is not type(other):
-      return cmp(type(self), type(other))
-    return self._typed_cmp(other)
-
-  @abstractmethod
-  def _typed_cmp(self, other):
-    raise NotImplementedError
-
-
-class ElementEvent(Event):
-  """Element-producing test stream event."""
-
-  def __init__(self, timestamped_values):
-    self.timestamped_values = timestamped_values
-
-  def _typed_cmp(self, other):
-    return cmp(self.timestamped_values, other.timestamped_values)
-
-
-class WatermarkEvent(Event):
-  """Watermark-advancing test stream event."""
-
-  def __init__(self, new_watermark):
-    self.new_watermark = timestamp.Timestamp.of(new_watermark)
-
-  def _typed_cmp(self, other):
-    return cmp(self.new_watermark, other.new_watermark)
-
-
-class ProcessingTimeEvent(Event):
-  """Processing time-advancing test stream event."""
-
-  def __init__(self, advance_by):
-    self.advance_by = timestamp.Duration.of(advance_by)
-
-  def _typed_cmp(self, other):
-    return cmp(self.advance_by, other.advance_by)
-
-
-class TestStream(PTransform):
-  """Test stream that generates events on an unbounded PCollection of elements.
-
-  Each event emits elements, advances the watermark or advances the processing
-  time.  After all of the specified elements are emitted, ceases to produce
-  output.
-  """
-
-  def __init__(self, coder=coders.FastPrimitivesCoder):
-    assert coder is not None
-    self.coder = coder
-    self.current_watermark = timestamp.MIN_TIMESTAMP
-    self.events = []
-
-  def expand(self, pbegin):
-    assert isinstance(pbegin, pvalue.PBegin)
-    self.pipeline = pbegin.pipeline
-    return pvalue.PCollection(self.pipeline)
-
-  def _infer_output_coder(self, input_type=None, input_coder=None):
-    return self.coder
-
-  def _add(self, event):
-    if isinstance(event, ElementEvent):
-      for tv in event.timestamped_values:
-        assert tv.timestamp < timestamp.MAX_TIMESTAMP, (
-            'Element timestamp must be before timestamp.MAX_TIMESTAMP.')
-    elif isinstance(event, WatermarkEvent):
-      assert event.new_watermark > self.current_watermark, (
-          'Watermark must strictly-monotonically advance.')
-      self.current_watermark = event.new_watermark
-    elif isinstance(event, ProcessingTimeEvent):
-      assert event.advance_by > 0, (
-          'Must advance processing time by positive amount.')
-    else:
-      raise ValueError('Unknown event: %s' % event)
-    self.events.append(event)
-
-  def add_elements(self, elements):
-    """Add elements to the TestStream.
-
-    Elements added to the TestStream will be produced during pipeline execution.
-    These elements can be TimestampedValue, WindowedValue or raw unwrapped
-    elements that are serializable using the TestStream's specified Coder.  When
-    a TimestampedValue or a WindowedValue element is used, the timestamp of the
-    TimestampedValue or WindowedValue will be the timestamp of the produced
-    element; otherwise, the current watermark timestamp will be used for that
-    element.  The windows of a given WindowedValue are ignored by the
-    TestStream.
-    """
-    timestamped_values = []
-    for element in elements:
-      if isinstance(element, TimestampedValue):
-        timestamped_values.append(element)
-      elif isinstance(element, WindowedValue):
-        # Drop windows for elements in test stream.
-        timestamped_values.append(
-            TimestampedValue(element.value, element.timestamp))
-      else:
-        # Add elements with timestamp equal to current watermark.
-        timestamped_values.append(
-            TimestampedValue(element, self.current_watermark))
-    self._add(ElementEvent(timestamped_values))
-    return self
-
-  def advance_watermark_to(self, new_watermark):
-    """Advance the watermark to a given Unix timestamp.
-
-    The Unix timestamp value used must be later than the previous watermark
-    value and should be given as an int, float or utils.timestamp.Timestamp
-    object.
-    """
-    self._add(WatermarkEvent(new_watermark))
-    return self
-
-  def advance_watermark_to_infinity(self):
-    """Advance the watermark to the end of time."""
-    self.advance_watermark_to(timestamp.MAX_TIMESTAMP)
-    return self
-
-  def advance_processing_time(self, advance_by):
-    """Advance the current processing time by a given duration in seconds.
-
-    The duration must be a positive second duration and should be given as an
-    int, float or utils.timestamp.Duration object.
-    """
-    self._add(ProcessingTimeEvent(advance_by))
-    return self

http://git-wip-us.apache.org/repos/asf/beam/blob/c028349c/sdks/python/apache_beam/utils/test_stream_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/utils/test_stream_test.py b/sdks/python/apache_beam/utils/test_stream_test.py
deleted file mode 100644
index b5b5c69..0000000
--- a/sdks/python/apache_beam/utils/test_stream_test.py
+++ /dev/null
@@ -1,83 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-"""Unit tests for the test_stream module."""
-
-import unittest
-
-from apache_beam.transforms.window import TimestampedValue
-from apache_beam.utils import timestamp
-from apache_beam.utils.test_stream import ElementEvent
-from apache_beam.utils.test_stream import ProcessingTimeEvent
-from apache_beam.utils.test_stream import TestStream
-from apache_beam.utils.test_stream import WatermarkEvent
-from apache_beam.utils.windowed_value import WindowedValue
-
-
-class TestStreamTest(unittest.TestCase):
-
-  def test_basic_test_stream(self):
-    test_stream = (TestStream()
-                   .advance_watermark_to(0)
-                   .add_elements([
-                       'a',
-                       WindowedValue('b', 3, []),
-                       TimestampedValue('c', 6)])
-                   .advance_processing_time(10)
-                   .advance_watermark_to(8)
-                   .add_elements(['d'])
-                   .advance_watermark_to_infinity())
-    self.assertEqual(
-        test_stream.events,
-        [
-            WatermarkEvent(0),
-            ElementEvent([
-                TimestampedValue('a', 0),
-                TimestampedValue('b', 3),
-                TimestampedValue('c', 6),
-            ]),
-            ProcessingTimeEvent(10),
-            WatermarkEvent(8),
-            ElementEvent([
-                TimestampedValue('d', 8),
-            ]),
-            WatermarkEvent(timestamp.MAX_TIMESTAMP),
-        ]
-    )
-
-  def test_test_stream_errors(self):
-    with self.assertRaises(AssertionError, msg=(
-        'Watermark must strictly-monotonically advance.')):
-      _ = (TestStream()
-           .advance_watermark_to(5)
-           .advance_watermark_to(4))
-
-    with self.assertRaises(AssertionError, msg=(
-        'Must advance processing time by positive amount.')):
-      _ = (TestStream()
-           .advance_processing_time(-1))
-
-    with self.assertRaises(AssertionError, msg=(
-        'Element timestamp must be before timestamp.MAX_TIMESTAMP.')):
-      _ = (TestStream()
-           .add_elements([
-               TimestampedValue('a', timestamp.MAX_TIMESTAMP)
-           ]))
-
-
-if __name__ == '__main__':
-  unittest.main()

http://git-wip-us.apache.org/repos/asf/beam/blob/c028349c/sdks/python/setup.py
----------------------------------------------------------------------
diff --git a/sdks/python/setup.py b/sdks/python/setup.py
index 681abbf..9bf3cf4 100644
--- a/sdks/python/setup.py
+++ b/sdks/python/setup.py
@@ -122,7 +122,7 @@ setuptools.setup(
     author_email=PACKAGE_EMAIL,
     packages=setuptools.find_packages(),
     package_data={'apache_beam': [
-        '*/*.pyx', '*/*/*.pyx', '*/*.pxd', '*/*/*.pxd', 'tests/data/*']},
+        '*/*.pyx', '*/*/*.pyx', '*/*.pxd', '*/*/*.pxd', 'testing/data/*']},
     ext_modules=cythonize([
         'apache_beam/**/*.pyx',
         'apache_beam/coders/coder_impl.py',


Mime
View raw message