beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From da...@apache.org
Subject [25/50] [abbrv] incubator-beam git commit: Move all files to apache_beam folder
Date Tue, 14 Jun 2016 23:13:00 GMT
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b14dfadd/sdks/python/apache_beam/utils/processes_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/utils/processes_test.py b/sdks/python/apache_beam/utils/processes_test.py
new file mode 100644
index 0000000..eaaf06a
--- /dev/null
+++ b/sdks/python/apache_beam/utils/processes_test.py
@@ -0,0 +1,103 @@
+# Copyright 2016 Google Inc. All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+"""Unit tests for the processes module."""
+
+import unittest
+
+
+import mock
+
+from google.cloud.dataflow.utils import processes
+
+
+class Exec(unittest.TestCase):
+
+  def setUp(self):
+    pass
+
+  @mock.patch('google.cloud.dataflow.utils.processes.subprocess')
+  def test_method_forwarding_not_windows(self, *unused_mocks):
+    # Test that the correct calls are being forwarded to the subprocess module
+    # when we are not on Windows.
+    processes.force_shell = False
+
+    processes.call(['subprocess', 'call'], shell=False, other_arg=True)
+    processes.subprocess.call.assert_called_once_with(
+        ['subprocess', 'call'],
+        shell=False,
+        other_arg=True)
+
+    processes.check_call(
+        ['subprocess', 'check_call'],
+        shell=False,
+        other_arg=True)
+    processes.subprocess.check_call.assert_called_once_with(
+        ['subprocess', 'check_call'],
+        shell=False,
+        other_arg=True)
+
+    processes.check_output(
+        ['subprocess', 'check_output'],
+        shell=False,
+        other_arg=True)
+    processes.subprocess.check_output.assert_called_once_with(
+        ['subprocess', 'check_output'],
+        shell=False,
+        other_arg=True)
+
+    processes.Popen(['subprocess', 'Popen'], shell=False, other_arg=True)
+    processes.subprocess.Popen.assert_called_once_with(
+        ['subprocess', 'Popen'],
+        shell=False,
+        other_arg=True)
+
+  @mock.patch('google.cloud.dataflow.utils.processes.subprocess')
+  def test_method_forwarding_windows(self, *unused_mocks):
+    # Test that the correct calls are being forwarded to the subprocess module
+    # and that the shell=True flag is added when we are on Windows.
+    processes.force_shell = True
+
+    processes.call(['subprocess', 'call'], shell=False, other_arg=True)
+    processes.subprocess.call.assert_called_once_with(
+        ['subprocess', 'call'],
+        shell=True,
+        other_arg=True)
+
+    processes.check_call(
+        ['subprocess', 'check_call'],
+        shell=False,
+        other_arg=True)
+    processes.subprocess.check_call.assert_called_once_with(
+        ['subprocess', 'check_call'],
+        shell=True,
+        other_arg=True)
+
+    processes.check_output(
+        ['subprocess', 'check_output'],
+        shell=False,
+        other_arg=True)
+    processes.subprocess.check_output.assert_called_once_with(
+        ['subprocess', 'check_output'],
+        shell=True,
+        other_arg=True)
+
+    processes.Popen(['subprocess', 'Popen'], shell=False, other_arg=True)
+    processes.subprocess.Popen.assert_called_once_with(
+        ['subprocess', 'Popen'],
+        shell=True,
+        other_arg=True)
+
+
+if __name__ == '__main__':
+  unittest.main()

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b14dfadd/sdks/python/apache_beam/utils/profiler.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/utils/profiler.py b/sdks/python/apache_beam/utils/profiler.py
new file mode 100644
index 0000000..a210e8c
--- /dev/null
+++ b/sdks/python/apache_beam/utils/profiler.py
@@ -0,0 +1,66 @@
+# Copyright 2016 Google Inc. All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""A profiler context manager based on cProfile.Profile objects."""
+
+import cProfile
+import logging
+import os
+import pstats
+import StringIO
+import tempfile
+import time
+
+
+from google.cloud.dataflow.utils.dependency import _dependency_file_copy
+
+
+class Profile(object):
+  """cProfile wrapper context for saving and logging profiler results."""
+
+  SORTBY = 'cumulative'
+
+  def __init__(self, profile_id, profile_location=None, log_results=False):
+    self.stats = None
+    self.profile_id = str(profile_id)
+    self.profile_location = profile_location
+    self.log_results = log_results
+
+  def __enter__(self):
+    logging.info('Start profiling: %s', self.profile_id)
+    self.profile = cProfile.Profile()
+    self.profile.enable()
+    return self
+
+  def __exit__(self, *args):
+    self.profile.disable()
+    logging.info('Stop profiling: %s', self.profile_id)
+
+    if self.profile_location:
+      dump_location = os.path.join(
+          self.profile_location, 'profile',
+          ('%s-%s' % (time.strftime('%Y-%m-%d_%H_%M_%S'), self.profile_id)))
+      fd, filename = tempfile.mkstemp()
+      self.profile.dump_stats(filename)
+      logging.info('Copying profiler data to: [%s]', dump_location)
+      _dependency_file_copy(filename, dump_location)  # pylint: disable=protected-access
+      os.close(fd)
+      os.remove(filename)
+
+    if self.log_results:
+      s = StringIO.StringIO()
+      self.stats = pstats.Stats(
+          self.profile, stream=s).sort_stats(Profile.SORTBY)
+      self.stats.print_stats()
+      logging.info('Profiler data: [%s]', s.getvalue())

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b14dfadd/sdks/python/apache_beam/utils/retry.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/utils/retry.py b/sdks/python/apache_beam/utils/retry.py
new file mode 100644
index 0000000..78c9c98
--- /dev/null
+++ b/sdks/python/apache_beam/utils/retry.py
@@ -0,0 +1,194 @@
+# Copyright 2016 Google Inc. All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""Retry decorators for calls raising exceptions.
+
+This module is used mostly to decorate all integration points where the code
+makes calls to remote services. Searching through the code base for @retry
+should find all such places. For this reason even places where retry is not
+needed right now use a @retry.no_retries decorator.
+"""
+
+import logging
+import random
+import sys
+import time
+import traceback
+
+from apitools.base.py.exceptions import HttpError
+
+
+class PermanentException(Exception):
+  """Base class for exceptions that should not be retried."""
+  pass
+
+
+class FuzzedExponentialIntervals(object):
+  """Iterable for intervals that are exponentially spaced, with fuzzing.
+
+  On iteration, yields retry interval lengths, in seconds. Every iteration over
+  this iterable will yield differently fuzzed interval lengths, as long as fuzz
+  is nonzero.
+
+  Args:
+    initial_delay_secs: The delay before the first retry, in seconds.
+    num_retries: The total number of times to retry.
+    factor: The exponential factor to use on subsequent retries.
+      Default is 2 (doubling).
+    fuzz: A value between 0 and 1, indicating the fraction of fuzz. For a
+      given delay d, the fuzzed delay is randomly chosen between
+      [(1 - fuzz) * d, d].
+    max_delay_sec: Maximum delay (in seconds). After this limit is reached,
+      further tries use max_delay_sec instead of exponentially increasing
+      the time. Defaults to 4 hours.
+  """
+
+  def __init__(self, initial_delay_secs, num_retries, factor=2, fuzz=0.5,
+               max_delay_secs=60 * 60 * 4):
+    self._initial_delay_secs = initial_delay_secs
+    self._num_retries = num_retries
+    self._factor = factor
+    if not 0 <= fuzz <= 1:
+      raise ValueError('Fuzz parameter expected to be in [0, 1] range.')
+    self._fuzz = fuzz
+    self._max_delay_secs = max_delay_secs
+
+  def __iter__(self):
+    current_delay_secs = min(self._max_delay_secs, self._initial_delay_secs)
+    for _ in xrange(self._num_retries):
+      fuzz_multiplier = 1 - self._fuzz + random.random() * self._fuzz
+      yield current_delay_secs * fuzz_multiplier
+      current_delay_secs = min(
+          self._max_delay_secs, current_delay_secs * self._factor)
+
+
+def retry_on_server_errors_filter(exception):
+  """Filter allowing retries on server errors and non-HttpErrors."""
+  if isinstance(exception, HttpError):
+    if exception.status_code >= 500:
+      return True
+    else:
+      return False
+  elif isinstance(exception, PermanentException):
+    return False
+  else:
+    # We may get here for non HttpErrors such as socket timeouts, SSL
+    # exceptions, etc.
+    return True
+
+
+def retry_on_server_errors_and_timeout_filter(exception):
+  if isinstance(exception, HttpError):
+    if exception.status_code == 408:  # 408 Request Timeout
+      return True
+  return retry_on_server_errors_filter(exception)
+
+
+class Clock(object):
+  """A simple clock implementing sleep()."""
+
+  def sleep(self, value):
+    time.sleep(value)
+
+
+def no_retries(fun):
+  """A retry decorator for places where we do not want retries."""
+  return with_exponential_backoff(
+      retry_filter=lambda _: False, clock=None)(fun)
+
+
+def with_exponential_backoff(
+    num_retries=16, initial_delay_secs=5.0, logger=logging.warning,
+    retry_filter=retry_on_server_errors_filter,
+    clock=Clock(), fuzz=True):
+  """Decorator with arguments that control the retry logic.
+
+  Args:
+    num_retries: The total number of times to retry.
+    initial_delay_secs: The delay before the first retry, in seconds.
+    logger: A callable used to report an exception. Must have the same signature
+      as functions in the standard logging module. The default is
+      logging.warning.
+    retry_filter: A callable getting the exception raised and returning True
+      if the retry should happen. For instance we do not want to retry on
+      404 Http errors most of the time. The default value will return true
+      for server errors (HTTP status code >= 500) and non Http errors.
+    clock: A clock object implementing a sleep method. The default clock will
+      use time.sleep().
+    fuzz: True if the delay should be fuzzed (default). During testing False
+      can be used so that the delays are not randomized.
+
+  Returns:
+    As per Python decorators with arguments pattern returns a decorator
+    for the function which in turn will return the wrapped (decorated) function.
+
+  The decorator is intended to be used on callables that make HTTP or RPC
+  requests that can temporarily timeout or have transient errors. For instance
+  the make_http_request() call below will be retried 16 times with exponential
+  backoff and fuzzing of the delay interval (default settings).
+
+  from google.cloud.dataflow.utils import retry
+  # ...
+  @retry.with_exponential_backoff()
+  make_http_request(args)
+  """
+
+  def real_decorator(fun):
+    """The real decorator whose purpose is to return the wrapped function."""
+
+    retry_intervals = iter(
+        FuzzedExponentialIntervals(
+            initial_delay_secs, num_retries, fuzz=0.5 if fuzz else 0))
+
+    def wrapper(*args, **kwargs):
+      while True:
+        try:
+          return fun(*args, **kwargs)
+        except Exception as exn:  # pylint: disable=broad-except
+          if not retry_filter(exn):
+            raise
+          # Get the traceback object for the current exception. The
+          # sys.exc_info() function returns a tuple with three elements:
+          # exception type, exception value, and exception traceback.
+          exn_traceback = sys.exc_info()[2]
+          try:
+            try:
+              sleep_interval = retry_intervals.next()
+            except StopIteration:
+              # Re-raise the original exception since we finished the retries.
+              raise exn, None, exn_traceback
+
+            logger(
+                'Retry with exponential backoff: waiting for %s seconds before '
+                'retrying %s because we caught exception: %s '
+                'Traceback for above exception (most recent call last):\n%s',
+                sleep_interval,
+                getattr(fun, '__name__', str(fun)),
+                ''.join(traceback.format_exception_only(exn.__class__, exn)),
+                ''.join(traceback.format_tb(exn_traceback)))
+            clock.sleep(sleep_interval)
+          finally:
+            # Traceback objects in locals can cause reference cycles that will
+            # prevent garbage collection. Clear it now since we do not need
+            # it anymore.
+            sys.exc_clear()
+            exn_traceback = None
+
+    return wrapper
+
+  return real_decorator
+
+
+
+

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b14dfadd/sdks/python/apache_beam/utils/retry_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/utils/retry_test.py b/sdks/python/apache_beam/utils/retry_test.py
new file mode 100644
index 0000000..584654c
--- /dev/null
+++ b/sdks/python/apache_beam/utils/retry_test.py
@@ -0,0 +1,165 @@
+# Copyright 2016 Google Inc. All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""Unit tests for the retry module."""
+
+import unittest
+
+from google.cloud.dataflow.utils import retry
+
+from apitools.base.py.exceptions import HttpError
+
+
+class FakeClock(object):
+  """A fake clock object implementing sleep() and recording calls."""
+
+  def __init__(self):
+    self.calls = []
+
+  def sleep(self, value):
+    self.calls.append(value)
+
+
+class FakeLogger(object):
+  """A fake logger object implementing log() and recording calls."""
+
+  def __init__(self):
+    self.calls = []
+
+  def log(self, message, interval, func_name, exn_name, exn_traceback):
+    _ = interval, exn_traceback
+    self.calls.append((message, func_name, exn_name))
+
+
+@retry.with_exponential_backoff(clock=FakeClock())
+def test_function(a, b):
+  _ = a, b
+  raise NotImplementedError
+
+
+@retry.with_exponential_backoff(initial_delay_secs=1.0, num_retries=1)
+def test_function_with_real_clock(a, b):
+  _ = a, b
+  raise NotImplementedError
+
+
+@retry.no_retries
+def test_no_retry_function(a, b):
+  _ = a, b
+  raise NotImplementedError
+
+
+class RetryTest(unittest.TestCase):
+
+  def setUp(self):
+    self.clock = FakeClock()
+    self.logger = FakeLogger()
+    self.calls = 0
+
+  def permanent_failure(self, a, b):
+    raise NotImplementedError
+
+  def transient_failure(self, a, b):
+    self.calls += 1
+    if self.calls > 8:
+      return a + b
+    raise NotImplementedError
+
+  def http_error(self, code):
+    raise HttpError({'status': str(code)}, '', '')
+
+  def test_with_explicit_decorator(self):
+    # We pass one argument as positional argument and one as keyword argument
+    # so that we cover both code paths for argument handling.
+    self.assertRaises(NotImplementedError, test_function, 10, b=20)
+
+  def test_with_no_retry_decorator(self):
+    self.assertRaises(NotImplementedError, test_no_retry_function, 1, 2)
+
+  def test_with_real_clock(self):
+    self.assertRaises(NotImplementedError,
+                      test_function_with_real_clock, 10, b=20)
+
+  def test_with_default_number_of_retries(self):
+    self.assertRaises(NotImplementedError,
+                      retry.with_exponential_backoff(clock=self.clock)(
+                          self.permanent_failure),
+                      10, b=20)
+    self.assertEqual(len(self.clock.calls), 16)
+
+  def test_with_explicit_number_of_retries(self):
+    self.assertRaises(NotImplementedError,
+                      retry.with_exponential_backoff(
+                          clock=self.clock, num_retries=10)(
+                              self.permanent_failure),
+                      10, b=20)
+    self.assertEqual(len(self.clock.calls), 10)
+
+  def test_with_http_error_that_should_not_be_retried(self):
+    self.assertRaises(HttpError,
+                      retry.with_exponential_backoff(
+                          clock=self.clock, num_retries=10)(
+                              self.http_error),
+                      404)
+    # Make sure just one call was made.
+    self.assertEqual(len(self.clock.calls), 0)
+
+  def test_with_http_error_that_should_be_retried(self):
+    self.assertRaises(HttpError,
+                      retry.with_exponential_backoff(
+                          clock=self.clock, num_retries=10)(
+                              self.http_error),
+                      500)
+    self.assertEqual(len(self.clock.calls), 10)
+
+  def test_with_explicit_initial_delay(self):
+    self.assertRaises(NotImplementedError,
+                      retry.with_exponential_backoff(
+                          initial_delay_secs=10.0, clock=self.clock,
+                          fuzz=False)(
+                              self.permanent_failure),
+                      10, b=20)
+    self.assertEqual(len(self.clock.calls), 16)
+    self.assertEqual(self.clock.calls[0], 10.0)
+
+  def test_log_calls_for_permanent_failure(self):
+    self.assertRaises(NotImplementedError,
+                      retry.with_exponential_backoff(
+                          clock=self.clock, logger=self.logger.log)(
+                              self.permanent_failure),
+                      10, b=20)
+    self.assertEqual(len(self.logger.calls), 16)
+    for message, func_name, exn_name  in self.logger.calls:
+      self.assertTrue(message.startswith('Retry with exponential backoff:'))
+      self.assertEqual(exn_name, 'NotImplementedError\n')
+      self.assertEqual(func_name, 'permanent_failure')
+
+  def test_log_calls_for_transient_failure(self):
+    result = retry.with_exponential_backoff(
+        clock=self.clock, logger=self.logger.log, fuzz=False)(
+            self.transient_failure)(10, b=20)
+    self.assertEqual(result, 30)
+    self.assertEqual(len(self.clock.calls), 8)
+    self.assertEqual(self.clock.calls,
+                     [5.0 * 1, 5.0 * 2, 5.0 * 4, 5.0 * 8,
+                      5.0 * 16, 5.0 * 32, 5.0 * 64, 5.0 * 128])
+    self.assertEqual(len(self.logger.calls), 8)
+    for message, func_name, exn_name  in self.logger.calls:
+      self.assertTrue(message.startswith('Retry with exponential backoff:'))
+      self.assertEqual(exn_name, 'NotImplementedError\n')
+      self.assertEqual(func_name, 'transient_failure')
+
+
+if __name__ == '__main__':
+  unittest.main()

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b14dfadd/sdks/python/apache_beam/version.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/version.py b/sdks/python/apache_beam/version.py
new file mode 100644
index 0000000..f489619
--- /dev/null
+++ b/sdks/python/apache_beam/version.py
@@ -0,0 +1,17 @@
+# Copyright 2016 Google Inc. All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""Dataflow SDK for Python version information."""
+
+__version__ = '0.2.7'

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b14dfadd/sdks/python/google/cloud/dataflow/__init__.py
----------------------------------------------------------------------
diff --git a/sdks/python/google/cloud/dataflow/__init__.py b/sdks/python/google/cloud/dataflow/__init__.py
deleted file mode 100644
index af28d3a..0000000
--- a/sdks/python/google/cloud/dataflow/__init__.py
+++ /dev/null
@@ -1,78 +0,0 @@
-# Copyright 2016 Google Inc. All Rights Reserved.
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-#      http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-"""Google Cloud Dataflow SDK for Python.
-
-Google Cloud Dataflow <http://cloud.google.com/dataflow/>
-provides a simple, powerful programming model for building both batch
-and streaming parallel data processing pipelines.
-
-The Dataflow SDK for Python provides access to Dataflow capabilities
-from the Python programming language.
-
-Status
-------
-The SDK is still early in its development, and significant changes
-should be expected before the first stable version.
-
-Overview
---------
-The key concepts in this programming model are
-
-* PCollection:  represents a collection of data, which could be
-  bounded or unbounded in size.
-* PTransform:  represents a computation that transforms input
-  PCollections into output PCollections.
-* Pipeline:  manages a directed acyclic graph of PTransforms and
-  PCollections that is ready for execution.
-* Runner:  specifies where and how the Pipeline should execute.
-* Reading and Writing Data:  your pipeline can read from an external
-  source and write to an external data sink.
-
-Typical usage
--------------
-At the top of your source file::
-
-    import google.cloud.dataflow as df
-
-After this import statement
-
-* transform classes are available as df.FlatMap, df.GroupByKey, etc.
-* Pipeline class is available as df.Pipeline
-* text source/sink classes are available as df.io.TextFileSource,
-  df.io.TextFileSink
-
-Examples
---------
-The examples subdirectory has some examples.
-
-"""
-
-
-import sys
-
-
-if sys.version_info.major != 2:
-  raise RuntimeError(
-      'Dataflow SDK for Python is supported only on Python 2.7. '
-      'It is not supported on Python [%s].' % sys.version)
-
-
-import google.cloud.dataflow.internal.pickler
-
-from google.cloud.dataflow import coders
-from google.cloud.dataflow import io
-from google.cloud.dataflow import typehints
-from google.cloud.dataflow.pipeline import Pipeline
-from google.cloud.dataflow.transforms import *

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b14dfadd/sdks/python/google/cloud/dataflow/coders/__init__.py
----------------------------------------------------------------------
diff --git a/sdks/python/google/cloud/dataflow/coders/__init__.py b/sdks/python/google/cloud/dataflow/coders/__init__.py
deleted file mode 100644
index 610a6ef..0000000
--- a/sdks/python/google/cloud/dataflow/coders/__init__.py
+++ /dev/null
@@ -1,16 +0,0 @@
-# Copyright 2016 Google Inc. All Rights Reserved.
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-#      http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-from google.cloud.dataflow.coders.coders import *
-from google.cloud.dataflow.coders.typecoders import registry

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b14dfadd/sdks/python/google/cloud/dataflow/coders/coder_impl.pxd
----------------------------------------------------------------------
diff --git a/sdks/python/google/cloud/dataflow/coders/coder_impl.pxd b/sdks/python/google/cloud/dataflow/coders/coder_impl.pxd
deleted file mode 100644
index 663d37d..0000000
--- a/sdks/python/google/cloud/dataflow/coders/coder_impl.pxd
+++ /dev/null
@@ -1,109 +0,0 @@
-# Copyright 2016 Google Inc. All Rights Reserved.
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-#      http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-# cython: profile=True
-
-cimport cython
-
-cimport cpython.ref
-cimport cpython.tuple
-cimport libc.stdint
-cimport libc.stdlib
-cimport libc.string
-
-from .stream cimport InputStream, OutputStream
-
-
-cdef object loads, dumps, create_InputStream, create_OutputStream
-cdef type WindowedValue
-
-
-cdef class CoderImpl(object):
-  cpdef encode_to_stream(self, value, OutputStream stream, bint nested)
-  cpdef decode_from_stream(self, InputStream stream, bint nested)
-  cpdef bytes encode(self, value)
-  cpdef decode(self, bytes encoded)
-
-
-cdef class SimpleCoderImpl(CoderImpl):
-  pass
-
-
-cdef class StreamCoderImpl(CoderImpl):
-  pass
-
-
-cdef class CallbackCoderImpl(CoderImpl):
-  cdef object _encoder
-  cdef object _decoder
-
-
-cdef class DeterministicPickleCoderImpl(CoderImpl):
-  cdef CoderImpl _pickle_coder
-  cdef object _step_label
-  cdef bint _check_safe(self, value) except -1
-
-
-cdef class BytesCoderImpl(CoderImpl):
-  pass
-
-
-cdef class FloatCoderImpl(StreamCoderImpl):
-  pass
-
-
-cdef class TimestampCoderImpl(StreamCoderImpl):
-  cdef object timestamp_class
-
-
-cdef list small_ints
-cdef class VarIntCoderImpl(StreamCoderImpl):
-  @cython.locals(ivalue=libc.stdint.int64_t)
-  cpdef bytes encode(self, value)
-
-
-cdef class SingletonCoderImpl(CoderImpl):
-  cdef object _value
-
-
-cdef class AbstractComponentCoderImpl(StreamCoderImpl):
-  cdef tuple _coder_impls
-
-  cpdef _extract_components(self, value)
-  cpdef _construct_from_components(self, components)
-
-  @cython.locals(c=CoderImpl)
-  cpdef encode_to_stream(self, value, OutputStream stream, bint nested)
-  @cython.locals(c=CoderImpl)
-  cpdef decode_from_stream(self, InputStream stream, bint nested)
-
-
-cdef class TupleCoderImpl(AbstractComponentCoderImpl):
-  pass
-
-
-cdef class SequenceCoderImpl(StreamCoderImpl):
-  cdef CoderImpl _elem_coder
-  cpdef _construct_from_sequence(self, values)
-
-
-cdef class TupleSequenceCoderImpl(SequenceCoderImpl):
-  pass
-
-
-cdef class WindowedValueCoderImpl(StreamCoderImpl):
-  """A coder for windowed values."""
-  cdef CoderImpl _value_coder
-  cdef CoderImpl _timestamp_coder
-  cdef CoderImpl _windows_coder

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b14dfadd/sdks/python/google/cloud/dataflow/coders/coder_impl.py
----------------------------------------------------------------------
diff --git a/sdks/python/google/cloud/dataflow/coders/coder_impl.py b/sdks/python/google/cloud/dataflow/coders/coder_impl.py
deleted file mode 100644
index 0ce4354..0000000
--- a/sdks/python/google/cloud/dataflow/coders/coder_impl.py
+++ /dev/null
@@ -1,316 +0,0 @@
-# Copyright 2016 Google Inc. All Rights Reserved.
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-#      http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-"""Coder implementations.
-
-The actual encode/decode implementations are split off from coders to
-allow conditional (compiled/pure) implementations, which can be used to
-encode many elements with minimal overhead.
-
-This module may be optionally compiled with Cython, using the corresponding
-coder_impl.pxd file for type hints.
-"""
-
-import collections
-from cPickle import loads, dumps
-
-
-# pylint: disable=g-import-not-at-top
-try:
-  # Don't depend on the full dataflow sdk to test coders.
-  from google.cloud.dataflow.transforms.window import WindowedValue
-except ImportError:
-  WindowedValue = collections.namedtuple(
-      'WindowedValue', ('value', 'timestamp', 'windows'))
-
-try:
-  from stream import InputStream as create_InputStream
-  from stream import OutputStream as create_OutputStream
-except ImportError:
-  from slow_stream import InputStream as create_InputStream
-  from slow_stream import OutputStream as create_OutputStream
-# pylint: enable=g-import-not-at-top
-
-
-class CoderImpl(object):
-
-  def encode_to_stream(self, value, stream, nested):
-    """Reads object from potentially-nested encoding in stream."""
-    raise NotImplementedError
-
-  def decode_from_stream(self, stream, nested):
-    """Reads object from potentially-nested encoding in stream."""
-    raise NotImplementedError
-
-  def encode(self, value):
-    """Encodes an object to an unnested string."""
-    raise NotImplementedError
-
-  def decode(self, encoded):
-    """Encodes an object to an unnested string."""
-    raise NotImplementedError
-
-
-class SimpleCoderImpl(CoderImpl):
-  """Subclass of CoderImpl implementing stream methods using encode/decode."""
-
-  def encode_to_stream(self, value, stream, nested):
-    """Reads object from potentially-nested encoding in stream."""
-    stream.write(self.encode(value), nested)
-
-  def decode_from_stream(self, stream, nested):
-    """Reads object from potentially-nested encoding in stream."""
-    return self.decode(stream.read_all(nested))
-
-
-class StreamCoderImpl(CoderImpl):
-  """Subclass of CoderImpl implementing encode/decode using stream methods."""
-
-  def encode(self, value):
-    out = create_OutputStream()
-    self.encode_to_stream(value, out, False)
-    return out.get()
-
-  def decode(self, encoded):
-    return self.decode_from_stream(create_InputStream(encoded), False)
-
-
-class CallbackCoderImpl(CoderImpl):
-  """A CoderImpl that calls back to the _impl methods on the Coder itself.
-
-  This is the default implementation used if Coder._get_impl()
-  is not overwritten.
-  """
-
-  def __init__(self, encoder, decoder):
-    self._encoder = encoder
-    self._decoder = decoder
-
-  def encode_to_stream(self, value, stream, nested):
-    return stream.write(self._encoder(value), nested)
-
-  def decode_from_stream(self, stream, nested):
-    return self._decoder(stream.read_all(nested))
-
-  def encode(self, value):
-    return self._encoder(value)
-
-  def decode(self, encoded):
-    return self._decoder(encoded)
-
-
-class DeterministicPickleCoderImpl(CoderImpl):
-
-  def __init__(self, pickle_coder, step_label):
-    self._pickle_coder = pickle_coder
-    self._step_label = step_label
-
-  def _check_safe(self, value):
-    if isinstance(value, (str, unicode, long, int, float)):
-      pass
-    elif value is None:
-      pass
-    elif isinstance(value, (tuple, list)):
-      for x in value:
-        self._check_safe(x)
-    else:
-      raise TypeError(
-          "Unable to deterministically code '%s' of type '%s', "
-          "please provide a type hint for the input of '%s'" % (
-              value, type(value), self._step_label))
-
-  def encode_to_stream(self, value, stream, nested):
-    self._check_safe(value)
-    return self._pickle_coder.encode_to_stream(value, stream, nested)
-
-  def decode_from_stream(self, stream, nested):
-    return self._pickle_coder.decode_from_stream(stream, nested)
-
-  def encode(self, value):
-    self._check_safe(value)
-    return self._pickle_coder.encode(value)
-
-  def decode(self, encoded):
-    return self._pickle_coder.decode(encoded)
-
-
-class BytesCoderImpl(CoderImpl):
-  """A coder for bytes/str objects."""
-
-  def encode_to_stream(self, value, out, nested):
-    out.write(value, nested)
-
-  def decode_from_stream(self, in_stream, nested):
-    return in_stream.read_all(nested)
-
-  def encode(self, value):
-    assert isinstance(value, bytes), (value, type(value))
-    return value
-
-  def decode(self, encoded):
-    return encoded
-
-
-class FloatCoderImpl(StreamCoderImpl):
-
-  def encode_to_stream(self, value, out, nested):
-    out.write_bigendian_double(value)
-
-  def decode_from_stream(self, in_stream, nested):
-    return in_stream.read_bigendian_double()
-
-
-class TimestampCoderImpl(StreamCoderImpl):
-
-  def __init__(self, timestamp_class):
-    self.timestamp_class = timestamp_class
-
-  def encode_to_stream(self, value, out, nested):
-    out.write_bigendian_int64(value.micros)
-
-  def decode_from_stream(self, in_stream, nested):
-    return self.timestamp_class(micros=in_stream.read_bigendian_int64())
-
-
-small_ints = [chr(_) for _ in range(128)]
-
-
-class VarIntCoderImpl(StreamCoderImpl):
-  """A coder for long/int objects."""
-
-  def encode_to_stream(self, value, out, nested):
-    out.write_var_int64(value)
-
-  def decode_from_stream(self, in_stream, nested):
-    return in_stream.read_var_int64()
-
-  def encode(self, value):
-    ivalue = value  # type cast
-    if 0 <= ivalue < len(small_ints):
-      return small_ints[ivalue]
-    else:
-      return StreamCoderImpl.encode(self, value)
-
-  def decode(self, encoded):
-    if len(encoded) == 1:
-      i = ord(encoded)
-      if 0 <= i < 128:
-        return i
-    return StreamCoderImpl.decode(self, encoded)
-
-
-class SingletonCoderImpl(CoderImpl):
-  """A coder that always encodes exactly one value."""
-
-  def __init__(self, value):
-    self._value = value
-
-  def encode_to_stream(self, value, stream, nested):
-    pass
-
-  def decode_from_stream(self, stream, nested):
-    return self._value
-
-  def encode(self, value):
-    b = ''  # avoid byte vs str vs unicode error
-    return b
-
-  def decode(self, encoded):
-    return self._value
-
-
-class AbstractComponentCoderImpl(StreamCoderImpl):
-
-  def __init__(self, coder_impls):
-    for c in coder_impls:
-      assert isinstance(c, CoderImpl), c
-    self._coder_impls = tuple(coder_impls)
-
-  def _extract_components(self, value):
-    raise NotImplementedError
-
-  def _construct_from_components(self, components):
-    raise NotImplementedError
-
-  def encode_to_stream(self, value, out, nested):
-    values = self._extract_components(value)
-    if len(self._coder_impls) != len(values):
-      raise ValueError(
-          'Number of components does not match number of coders.')
-    for i in range(0, len(self._coder_impls)):
-      c = self._coder_impls[i]   # type cast
-      c.encode_to_stream(values[i], out, True)
-
-  def decode_from_stream(self, in_stream, nested):
-    return self._construct_from_components(
-        [c.decode_from_stream(in_stream, True) for c in self._coder_impls])
-
-
-class TupleCoderImpl(AbstractComponentCoderImpl):
-  """A coder for tuple objects."""
-
-  def _extract_components(self, value):
-    return value
-
-  def _construct_from_components(self, components):
-    return tuple(components)
-
-
-class SequenceCoderImpl(StreamCoderImpl):
-  """A coder for sequences of known length."""
-
-  def __init__(self, elem_coder):
-    self._elem_coder = elem_coder
-
-  def _construct_from_sequence(self, values):
-    raise NotImplementedError
-
-  def encode_to_stream(self, value, out, nested):
-    # Compatible with Java's IterableLikeCoder.
-    out.write_bigendian_int32(len(value))
-    for elem in value:
-      self._elem_coder.encode_to_stream(elem, out, True)
-
-  def decode_from_stream(self, in_stream, nested):
-    size = in_stream.read_bigendian_int32()
-    return self._construct_from_sequence(
-        [self._elem_coder.decode_from_stream(in_stream, True)
-         for _ in range(size)])
-
-
-class TupleSequenceCoderImpl(SequenceCoderImpl):
-  """A coder for homogeneous tuple objects."""
-
-  def _construct_from_sequence(self, components):
-    return tuple(components)
-
-
-class WindowedValueCoderImpl(StreamCoderImpl):
-  """A coder for windowed values."""
-
-  def __init__(self, value_coder, timestamp_coder, window_coder):
-    self._value_coder = value_coder
-    self._timestamp_coder = timestamp_coder
-    self._windows_coder = TupleSequenceCoderImpl(window_coder)
-
-  def encode_to_stream(self, value, out, nested):
-    self._value_coder.encode_to_stream(value.value, out, True)
-    self._timestamp_coder.encode_to_stream(value.timestamp, out, True)
-    self._windows_coder.encode_to_stream(value.windows, out, True)
-
-  def decode_from_stream(self, in_stream, nested):
-    return WindowedValue(
-        self._value_coder.decode_from_stream(in_stream, True),
-        self._timestamp_coder.decode_from_stream(in_stream, True),
-        self._windows_coder.decode_from_stream(in_stream, True))

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b14dfadd/sdks/python/google/cloud/dataflow/coders/coders.py
----------------------------------------------------------------------
diff --git a/sdks/python/google/cloud/dataflow/coders/coders.py b/sdks/python/google/cloud/dataflow/coders/coders.py
deleted file mode 100644
index 16edff0..0000000
--- a/sdks/python/google/cloud/dataflow/coders/coders.py
+++ /dev/null
@@ -1,511 +0,0 @@
-# Copyright 2016 Google Inc. All Rights Reserved.
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-#      http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-"""Collection of useful coders."""
-
-import base64
-import collections
-import cPickle as pickle
-
-from google.cloud.dataflow.coders import coder_impl
-
-
-# pylint: disable=g-import-not-at-top
-# Avoid dependencies on the full SDK.
-try:
-  # Import dill from the pickler module to make sure our monkey-patching of dill
-  # occurs.
-  from google.cloud.dataflow.internal.pickler import dill
-  from google.cloud.dataflow.transforms.timeutil import Timestamp
-except ImportError:
-  # We fall back to using the stock dill library in tests that don't use the
-  # full Python SDK.
-  import dill
-  Timestamp = collections.namedtuple('Timestamp', 'micros')
-
-
-def serialize_coder(coder):
-  from google.cloud.dataflow.internal import pickler
-  return '%s$%s' % (coder.__class__.__name__, pickler.dumps(coder))
-
-
-def deserialize_coder(serialized):
-  from google.cloud.dataflow.internal import pickler
-  return pickler.loads(serialized.split('$', 1)[1])
-# pylint: enable=g-import-not-at-top
-
-
-class Coder(object):
-  """Base class for coders."""
-
-  def encode(self, value):
-    """Encodes the given object into a byte string."""
-    raise NotImplementedError('Encode not implemented: %s.' % self)
-
-  def decode(self, encoded):
-    """Decodes the given byte string into the corresponding object."""
-    raise NotImplementedError('Decode not implemented: %s.' % self)
-
-  def is_deterministic(self):
-    """Whether this coder is guaranteed to encode values deterministically.
-
-    A deterministic coder is required for key coders in GroupByKey operations
-    to produce consistent results.
-
-    For example, note that the default coder, the PickleCoder, is not
-    deterministic: the ordering of picked entries in maps may vary across
-    executions since there is no defined order, and such a coder is not in
-    general suitable for usage as a key coder in GroupByKey operations, since
-    each instance of the same key may be encoded differently.
-
-    Returns:
-      Whether coder is deterministic.
-    """
-    return False
-
-  # ===========================================================================
-  # Methods below are internal SDK details that don't need to be modified for
-  # user-defined coders.
-  # ===========================================================================
-
-  def _create_impl(self):
-    """Creates a CoderImpl to do the actual encoding and decoding.
-    """
-    return coder_impl.CallbackCoderImpl(self.encode, self.decode)
-
-  def get_impl(self):
-    if not hasattr(self, '_impl'):
-      self._impl = self._create_impl()
-      assert isinstance(self._impl, coder_impl.CoderImpl)
-    return self._impl
-
-  def __getstate__(self):
-    return self._dict_without_impl()
-
-  def _dict_without_impl(self):
-    if hasattr(self, '_impl'):
-      d = dict(self.__dict__)
-      del d['_impl']
-      return d
-    else:
-      return self.__dict__
-
-  @classmethod
-  def from_type_hint(cls, unused_typehint, unused_registry):
-    # If not overridden, just construct the coder without arguments.
-    return cls()
-
-  def is_kv_coder(self):
-    return False
-
-  def key_coder(self):
-    if self.is_kv_coder():
-      raise NotImplementedError('key_coder: %s' % self)
-    else:
-      raise ValueError('Not a KV coder: %s.' % self)
-
-  def value_coder(self):
-    if self.is_kv_coder():
-      raise NotImplementedError('value_coder: %s' % self)
-    else:
-      raise ValueError('Not a KV coder: %s.' % self)
-
-  def _get_component_coders(self):
-    """Returns the internal component coders of this coder."""
-    # This is an internal detail of the Coder API and does not need to be
-    # refined in user-defined Coders.
-    return []
-
-  def as_cloud_object(self):
-    """Returns Google Cloud Dataflow API description of this coder."""
-    # This is an internal detail of the Coder API and does not need to be
-    # refined in user-defined Coders.
-
-    value = {
-        # We pass coders in the form "<coder_name>$<pickled_data>" to make the
-        # job description JSON more readable.  Data before the $ is ignored by
-        # the worker.
-        '@type': serialize_coder(self),
-        'component_encodings': list(
-            component.as_cloud_object()
-            for component in self._get_component_coders())
-    }
-    return value
-
-  def __repr__(self):
-    return self.__class__.__name__
-
-  def __eq__(self, other):
-    # pylint: disable=protected-access
-    return (self.__class__ == other.__class__
-       and self._dict_without_impl() == other._dict_without_impl())
-    # pylint: enable=protected-access
-
-
-class StrUtf8Coder(Coder):
-  """A coder used for reading and writing strings as UTF-8."""
-
-  def encode(self, value):
-    return value.encode('utf-8')
-
-  def decode(self, value):
-    return value.decode('utf-8')
-
-  def is_deterministic(self):
-    return True
-
-
-class ToStringCoder(Coder):
-  """A default string coder used if no sink coder is specified."""
-
-  def encode(self, value):
-    if isinstance(value, unicode):
-      return value.encode('utf-8')
-    elif isinstance(value, str):
-      return value
-    else:
-      return str(value)
-
-  def decode(self, _):
-    raise NotImplementedError('ToStringCoder cannot be used for decoding.')
-
-  def is_deterministic(self):
-    return True
-
-
-class FastCoder(Coder):
-  """Coder subclass used when a (faster) CoderImpl is supplied directly.
-
-  The Coder class defines _create_impl in terms of encode() and decode();
-  this class inverts that defining encode() and decode() in terms of
-  _create_impl().
-  """
-
-  def encode(self, value):
-    """Encodes the given object into a byte string."""
-    return self.get_impl().encode(value)
-
-  def decode(self, encoded):
-    """Decodes the given byte string into the corresponding object."""
-    return self.get_impl().decode(encoded)
-
-  def _create_impl(self):
-    raise NotImplementedError
-
-
-class BytesCoder(FastCoder):
-  """Byte string coder."""
-
-  def _create_impl(self):
-    return coder_impl.BytesCoderImpl()
-
-  def is_deterministic(self):
-    return True
-
-
-class VarIntCoder(FastCoder):
-  """Variable-length integer coder."""
-
-  def _create_impl(self):
-    return coder_impl.VarIntCoderImpl()
-
-  def is_deterministic(self):
-    return True
-
-
-class FloatCoder(FastCoder):
-  """A coder used for floating-point values."""
-
-  def _create_impl(self):
-    return coder_impl.FloatCoderImpl()
-
-  def is_deterministic(self):
-    return True
-
-
-class TimestampCoder(FastCoder):
-  """A coder used for timeutil.Timestamp values."""
-
-  def _create_impl(self):
-    return coder_impl.TimestampCoderImpl(Timestamp)
-
-  def is_deterministic(self):
-    return True
-
-
-class SingletonCoder(FastCoder):
-  """A coder that always encodes exactly one value."""
-
-  def __init__(self, value):
-    self._value = value
-
-  def _create_impl(self):
-    return coder_impl.SingletonCoderImpl(self._value)
-
-  def is_deterministic(self):
-    return True
-
-
-def maybe_dill_dumps(o):
-  """Pickle using cPickle or the Dill pickler as a fallback."""
-  # We need to use the dill pickler for objects of certain custom classes,
-  # including, for example, ones that contain lambdas.
-  try:
-    return pickle.dumps(o)
-  except Exception:  # pylint: disable=broad-except
-    return dill.dumps(o)
-
-
-def maybe_dill_loads(o):
-  """Unpickle using cPickle or the Dill pickler as a fallback."""
-  try:
-    return pickle.loads(o)
-  except Exception:  # pylint: disable=broad-except
-    return dill.loads(o)
-
-
-class _PickleCoderBase(FastCoder):
-  """Base class for pickling coders."""
-
-  def is_deterministic(self):
-    # Note that the default coder, the PickleCoder, is not deterministic (for
-    # example, the ordering of picked entries in maps may vary across
-    # executions), and so is not in general suitable for usage as a key coder in
-    # GroupByKey operations.
-    return False
-
-  def as_cloud_object(self, is_pair_like=True):
-    value = super(_PickleCoderBase, self).as_cloud_object()
-    # We currently use this coder in places where we cannot infer the coder to
-    # use for the value type in a more granular way.  In places where the
-    # service expects a pair, it checks for the "is_pair_like" key, in which
-    # case we would fail without the hack below.
-    if is_pair_like:
-      value['is_pair_like'] = True
-      value['component_encodings'] = [
-          self.as_cloud_object(is_pair_like=False),
-          self.as_cloud_object(is_pair_like=False)
-      ]
-
-    return value
-
-  # We allow .key_coder() and .value_coder() to be called on PickleCoder since
-  # we can't always infer the return values of lambdas in ParDo operations, the
-  # result of which may be used in a GroupBykey.
-  def is_kv_coder(self):
-    return True
-
-  def key_coder(self):
-    return self
-
-  def value_coder(self):
-    return self
-
-
-class PickleCoder(_PickleCoderBase):
-  """Coder using Python's pickle functionality."""
-
-  def _create_impl(self):
-    return coder_impl.CallbackCoderImpl(pickle.dumps, pickle.loads)
-
-
-class DillCoder(_PickleCoderBase):
-  """Coder using dill's pickle functionality."""
-
-  def _create_impl(self):
-    return coder_impl.CallbackCoderImpl(maybe_dill_dumps, maybe_dill_loads)
-
-
-class DeterministicPickleCoder(FastCoder):
-  """Throws runtime errors when pickling non-deterministic values."""
-
-  def __init__(self, pickle_coder, step_label):
-    self._pickle_coder = pickle_coder
-    self._step_label = step_label
-
-  def _create_impl(self):
-    return coder_impl.DeterministicPickleCoderImpl(
-        self._pickle_coder.get_impl(), self._step_label)
-
-  def is_deterministic(self):
-    return True
-
-  def is_kv_coder(self):
-    return True
-
-  def key_coder(self):
-    return self
-
-  def value_coder(self):
-    return self
-
-
-class Base64PickleCoder(Coder):
-  """Coder of objects by Python pickle, then base64 encoding."""
-  # TODO(robertwb): Do base64 encoding where it's needed (e.g. in json) rather
-  # than via a special Coder.
-
-  def encode(self, value):
-    return base64.b64encode(pickle.dumps(value))
-
-  def decode(self, encoded):
-    return pickle.loads(base64.b64decode(encoded))
-
-  def is_deterministic(self):
-    # Note that the Base64PickleCoder is not deterministic.  See the
-    # corresponding comments for PickleCoder above.
-    return False
-
-  # We allow .key_coder() and .value_coder() to be called on Base64PickleCoder
-  # since we can't always infer the return values of lambdas in ParDo
-  # operations, the result of which may be used in a GroupBykey.
-  #
-  # TODO(ccy): this is currently only used for KV values from Create transforms.
-  # Investigate a way to unify this with PickleCoder.
-  def is_kv_coder(self):
-    return True
-
-  def key_coder(self):
-    return self
-
-  def value_coder(self):
-    return self
-
-
-class TupleCoder(FastCoder):
-  """Coder of tuple objects."""
-
-  def __init__(self, components):
-    self._coders = tuple(components)
-
-  def _create_impl(self):
-    return coder_impl.TupleCoderImpl([c.get_impl() for c in self._coders])
-
-  def is_deterministic(self):
-    return all(c.is_deterministic() for c in self._coders)
-
-  @staticmethod
-  def from_type_hint(typehint, registry):
-    return TupleCoder([registry.get_coder(t) for t in typehint.tuple_types])
-
-  def as_cloud_object(self):
-    value = super(TupleCoder, self).as_cloud_object()
-    value['is_pair_like'] = True
-    return value
-
-  def _get_component_coders(self):
-    return self.coders()
-
-  def coders(self):
-    return self._coders
-
-  def is_kv_coder(self):
-    return len(self._coders) == 2
-
-  def key_coder(self):
-    if len(self._coders) != 2:
-      raise ValueError('TupleCoder does not have exactly 2 components.')
-    return self._coders[0]
-
-  def value_coder(self):
-    if len(self._coders) != 2:
-      raise ValueError('TupleCoder does not have exactly 2 components.')
-    return self._coders[1]
-
-  def __repr__(self):
-    return 'TupleCoder[%s]' % ', '.join(str(c) for c in self._coders)
-
-
-class TupleSequenceCoder(FastCoder):
-  """Coder of homogeneous tuple objects."""
-
-  def __init__(self, elem_coder):
-    self._elem_coder = elem_coder
-
-  def _create_impl(self):
-    return coder_impl.TupleSequenceCoderImpl(self._elem_coder.get_impl())
-
-  def is_deterministic(self):
-    return self._elem_coder.is_deterministic()
-
-  @staticmethod
-  def from_type_hint(typehint, registry):
-    return TupleSequenceCoder(registry.get_coder(typehint.inner_type))
-
-  def _get_component_coders(self):
-    return (self._elem_coder,)
-
-  def __repr__(self):
-    return 'TupleSequenceCoder[%r]' % self._elem_coder
-
-
-class WindowCoder(PickleCoder):
-  """Coder for windows in windowed values."""
-
-  def _create_impl(self):
-    return coder_impl.CallbackCoderImpl(pickle.dumps, pickle.loads)
-
-  def is_deterministic(self):
-    # Note that WindowCoder as implemented is not deterministic because the
-    # implementation simply pickles windows.  See the corresponding comments
-    # on PickleCoder for more details.
-    return False
-
-  def as_cloud_object(self):
-    return super(WindowCoder, self).as_cloud_object(is_pair_like=False)
-
-
-class WindowedValueCoder(FastCoder):
-  """Coder for windowed values."""
-
-  def __init__(self, wrapped_value_coder, timestamp_coder=None,
-               window_coder=None):
-    if not timestamp_coder:
-      timestamp_coder = TimestampCoder()
-    if not window_coder:
-      window_coder = PickleCoder()
-    self.wrapped_value_coder = wrapped_value_coder
-    self.timestamp_coder = timestamp_coder
-    self.window_coder = window_coder
-
-  def _create_impl(self):
-    return coder_impl.WindowedValueCoderImpl(
-        self.wrapped_value_coder.get_impl(),
-        self.timestamp_coder.get_impl(),
-        self.window_coder.get_impl())
-
-  def is_deterministic(self):
-    return all(c.is_deterministic() for c in [self.wrapped_value_coder,
-                                              self.timestamp_coder,
-                                              self.window_coder])
-
-  def as_cloud_object(self):
-    value = super(WindowedValueCoder, self).as_cloud_object()
-    value['is_wrapper'] = True
-    return value
-
-  def _get_component_coders(self):
-    return [self.wrapped_value_coder, self.timestamp_coder, self.window_coder]
-
-  def is_kv_coder(self):
-    return self.wrapped_value_coder.is_kv_coder()
-
-  def key_coder(self):
-    return self.wrapped_value_coder.key_coder()
-
-  def value_coder(self):
-    return self.wrapped_value_coder.value_coder()
-
-  def __repr__(self):
-    return 'WindowedValueCoder[%s]' % self.wrapped_value_coder

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b14dfadd/sdks/python/google/cloud/dataflow/coders/coders_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/google/cloud/dataflow/coders/coders_test.py b/sdks/python/google/cloud/dataflow/coders/coders_test.py
deleted file mode 100644
index d11d310..0000000
--- a/sdks/python/google/cloud/dataflow/coders/coders_test.py
+++ /dev/null
@@ -1,60 +0,0 @@
-# Copyright 2016 Google Inc. All Rights Reserved.
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-#      http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-
-import base64
-import logging
-import unittest
-
-from google.cloud.dataflow import coders
-
-
-class PickleCoderTest(unittest.TestCase):
-
-  def test_basics(self):
-    v = ('a' * 10, 'b' * 90)
-    pickler = coders.PickleCoder()
-    self.assertEquals(v, pickler.decode(pickler.encode(v)))
-    pickler = coders.Base64PickleCoder()
-    self.assertEquals(v, pickler.decode(pickler.encode(v)))
-    self.assertEquals(
-        coders.Base64PickleCoder().encode(v),
-        base64.b64encode(coders.PickleCoder().encode(v)))
-
-  def test_equality(self):
-    self.assertEquals(coders.PickleCoder(), coders.PickleCoder())
-    self.assertEquals(coders.Base64PickleCoder(), coders.Base64PickleCoder())
-    self.assertNotEquals(coders.Base64PickleCoder(), coders.PickleCoder())
-    self.assertNotEquals(coders.Base64PickleCoder(), object())
-
-
-class CodersTest(unittest.TestCase):
-
-  def test_str_utf8_coder(self):
-    real_coder = coders.registry.get_coder(str)
-    expected_coder = coders.BytesCoder()
-    self.assertEqual(
-        real_coder.encode('abc'), expected_coder.encode('abc'))
-    self.assertEqual('abc', real_coder.decode(real_coder.encode('abc')))
-
-    real_coder = coders.registry.get_coder(bytes)
-    expected_coder = coders.BytesCoder()
-    self.assertEqual(
-        real_coder.encode('abc'), expected_coder.encode('abc'))
-    self.assertEqual('abc', real_coder.decode(real_coder.encode('abc')))
-
-
-if __name__ == '__main__':
-  logging.getLogger().setLevel(logging.INFO)
-  unittest.main()

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b14dfadd/sdks/python/google/cloud/dataflow/coders/coders_test_common.py
----------------------------------------------------------------------
diff --git a/sdks/python/google/cloud/dataflow/coders/coders_test_common.py b/sdks/python/google/cloud/dataflow/coders/coders_test_common.py
deleted file mode 100644
index 29eaace..0000000
--- a/sdks/python/google/cloud/dataflow/coders/coders_test_common.py
+++ /dev/null
@@ -1,180 +0,0 @@
-# Copyright 2016 Google Inc. All Rights Reserved.
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-#      http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-"""Tests common to all coder implementations."""
-
-import logging
-import math
-import sys
-import unittest
-
-import dill
-
-import coders
-
-
-# Defined out of line for picklability.
-class CustomCoder(coders.Coder):
-
-  def encode(self, x):
-    return str(x+1)
-
-  def decode(self, encoded):
-    return int(encoded) - 1
-
-
-class CodersTest(unittest.TestCase):
-
-  # These class methods ensure that we test each defined coder in both
-  # nested and unnested context.
-
-  @classmethod
-  def setUpClass(cls):
-    cls.seen = set()
-    cls.seen_nested = set()
-
-  @classmethod
-  def tearDownClass(cls):
-    standard = set(c
-                   for c in coders.__dict__.values()
-                   if isinstance(c, type) and issubclass(c, coders.Coder) and
-                   'Base' not in c.__name__)
-    standard -= set([coders.Coder,
-                     coders.FastCoder,
-                     coders.Base64PickleCoder,
-                     coders.FloatCoder,
-                     coders.TimestampCoder,
-                     coders.ToStringCoder,
-                     coders.WindowCoder,
-                     coders.WindowedValueCoder])
-    assert not standard - cls.seen, standard - cls.seen
-    assert not standard - cls.seen_nested, standard - cls.seen_nested
-
-  @classmethod
-  def _observe(cls, coder):
-    cls.seen.add(type(coder))
-    cls._observe_nested(coder)
-
-  @classmethod
-  def _observe_nested(cls, coder):
-    if isinstance(coder, coders.TupleCoder):
-      for c in coder.coders():
-        cls.seen_nested.add(type(c))
-        cls._observe_nested(c)
-
-  def check_coder(self, coder, *values):
-    self._observe(coder)
-    for v in values:
-      self.assertEqual(v, coder.decode(coder.encode(v)))
-    copy1 = dill.loads(dill.dumps(coder))
-    copy2 = dill.loads(dill.dumps(coder))
-    for v in values:
-      self.assertEqual(v, copy1.decode(copy2.encode(v)))
-
-  def test_custom_coder(self):
-
-    self.check_coder(CustomCoder(), 1, -10, 5)
-    self.check_coder(coders.TupleCoder((CustomCoder(), coders.BytesCoder())),
-                     (1, 'a'), (-10, 'b'), (5, 'c'))
-
-  def test_pickle_coder(self):
-    self.check_coder(coders.PickleCoder(), 'a', 1, 1.5, (1, 2, 3))
-
-  def test_deterministic_pickle_coder(self):
-    coder = coders.DeterministicPickleCoder(coders.PickleCoder(), 'step')
-    self.check_coder(coder, 'a', 1, 1.5, (1, 2, 3))
-    with self.assertRaises(TypeError):
-      self.check_coder(coder, dict())
-    with self.assertRaises(TypeError):
-      self.check_coder(coder, [1, dict()])
-
-    self.check_coder(coders.TupleCoder((coder, coders.PickleCoder())),
-                     (1, dict()), ('a', [dict()]))
-
-  def test_dill_coder(self):
-    cell_value = (lambda x: lambda: x)(0).func_closure[0]
-    self.check_coder(coders.DillCoder(), 'a', 1, cell_value)
-    self.check_coder(
-        coders.TupleCoder((coders.VarIntCoder(), coders.DillCoder())),
-        (1, cell_value))
-
-  def test_bytes_coder(self):
-    self.check_coder(coders.BytesCoder(), 'a', '\0', 'z' * 1000)
-
-  def test_varint_coder(self):
-    # Small ints.
-    self.check_coder(coders.VarIntCoder(), *range(-10, 10))
-    # Multi-byte encoding starts at 128
-    self.check_coder(coders.VarIntCoder(), *range(120, 140))
-    # Large values
-    self.check_coder(coders.VarIntCoder(),
-                     *[int(math.pow(-1, k) * math.exp(k))
-                       for k in range(0, int(math.log(sys.maxint)))])
-
-  def test_float_coder(self):
-    self.check_coder(coders.FloatCoder(),
-                     *[float(0.1 * x) for x in range(-100, 100)])
-    self.check_coder(coders.FloatCoder(),
-                     *[float(2 ** (0.1 * x)) for x in range(-100, 100)])
-    self.check_coder(coders.FloatCoder(), float('-Inf'), float('Inf'))
-
-  def test_singleton_coder(self):
-    a = 'anything'
-    b = 'something else'
-    self.check_coder(coders.SingletonCoder(a), a)
-    self.check_coder(coders.SingletonCoder(b), b)
-    self.check_coder(coders.TupleCoder((coders.SingletonCoder(a),
-                                        coders.SingletonCoder(b))), (a, b))
-
-  def test_timestamp_coder(self):
-    self.check_coder(coders.TimestampCoder(),
-                     *[coders.Timestamp(micros=x) for x in range(-100, 100)])
-    self.check_coder(coders.TimestampCoder(),
-                     coders.Timestamp(micros=-1234567890),
-                     coders.Timestamp(micros=1234567890))
-    self.check_coder(coders.TimestampCoder(),
-                     coders.Timestamp(micros=-1234567890123456789),
-                     coders.Timestamp(micros=1234567890123456789))
-
-  def test_tuple_coder(self):
-    self.check_coder(
-        coders.TupleCoder((coders.VarIntCoder(), coders.BytesCoder())),
-        (1, 'a'),
-        (-2, 'a' * 100),
-        (300, 'abc\0' * 5))
-    self.check_coder(
-        coders.TupleCoder(
-            (coders.TupleCoder((coders.PickleCoder(), coders.VarIntCoder())),
-             coders.StrUtf8Coder())),
-        ((1, 2), 'a'),
-        ((-2, 5), u'a\u0101' * 100),
-        ((300, 1), 'abc\0' * 5))
-
-  def test_tuple_sequence_coder(self):
-    int_tuple_coder = coders.TupleSequenceCoder(coders.VarIntCoder())
-    self.check_coder(int_tuple_coder, (1, -1, 0), (), tuple(range(1000)))
-    self.check_coder(
-        coders.TupleCoder((coders.VarIntCoder(), int_tuple_coder)),
-        (1, (1, 2, 3)))
-
-  def test_base64_pickle_coder(self):
-    self.check_coder(coders.Base64PickleCoder(), 'a', 1, 1.5, (1, 2, 3))
-
-  def test_utf8_coder(self):
-    self.check_coder(coders.StrUtf8Coder(), 'a', u'ab\u00FF', u'\u0101\0')
-
-
-if __name__ == '__main__':
-  logging.getLogger().setLevel(logging.INFO)
-  unittest.main()

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b14dfadd/sdks/python/google/cloud/dataflow/coders/fast_coders_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/google/cloud/dataflow/coders/fast_coders_test.py b/sdks/python/google/cloud/dataflow/coders/fast_coders_test.py
deleted file mode 100644
index f2f4e6c..0000000
--- a/sdks/python/google/cloud/dataflow/coders/fast_coders_test.py
+++ /dev/null
@@ -1,34 +0,0 @@
-# Copyright 2016 Google Inc. All Rights Reserved.
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-#      http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-"""Unit tests for compiled implementation of coder impls."""
-
-import logging
-import unittest
-
-
-# Run all the standard coder test cases.
-from google.cloud.dataflow.coders.coders_test_common import *
-
-
-class FastCoders(unittest.TestCase):
-
-  def test_using_fast_impl(self):
-    # pylint: disable=g-import-not-at-top
-    # pylint: disable=unused-variable
-    import google.cloud.dataflow.coders.stream
-
-if __name__ == '__main__':
-  logging.getLogger().setLevel(logging.INFO)
-  unittest.main()

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b14dfadd/sdks/python/google/cloud/dataflow/coders/observable.py
----------------------------------------------------------------------
diff --git a/sdks/python/google/cloud/dataflow/coders/observable.py b/sdks/python/google/cloud/dataflow/coders/observable.py
deleted file mode 100644
index 8a01752..0000000
--- a/sdks/python/google/cloud/dataflow/coders/observable.py
+++ /dev/null
@@ -1,33 +0,0 @@
-# Copyright 2016 Google Inc. All Rights Reserved.
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-#      http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-
-"""Observable base class for iterables."""
-
-
-class ObservableMixin(object):
-  """An observable iterable.
-
-  Subclasses need to call self.notify_observers with any object yielded.
-  """
-
-  def __init__(self):
-    self.observers = []
-
-  def register_observer(self, callback):
-    self.observers.append(callback)
-
-  def notify_observers(self, value, **kwargs):
-    for o in self.observers:
-      o(value, **kwargs)

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b14dfadd/sdks/python/google/cloud/dataflow/coders/observable_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/google/cloud/dataflow/coders/observable_test.py b/sdks/python/google/cloud/dataflow/coders/observable_test.py
deleted file mode 100644
index 2b091bf..0000000
--- a/sdks/python/google/cloud/dataflow/coders/observable_test.py
+++ /dev/null
@@ -1,54 +0,0 @@
-# Copyright 2016 Google Inc. All Rights Reserved.
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-#      http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-"""Tests for the Observable mixin class."""
-
-import logging
-import unittest
-
-
-from google.cloud.dataflow.coders import observable
-
-
-class ObservableMixinTest(unittest.TestCase):
-  observed_count = 0
-  observed_sum = 0
-  observed_keys = []
-
-  def observer(self, value, key=None):
-    self.observed_count += 1
-    self.observed_sum += value
-    self.observed_keys.append(key)
-
-  def test_observable(self):
-    class Watched(observable.ObservableMixin):
-
-      def __iter__(self):
-        for i in (1, 4, 3):
-          self.notify_observers(i, key='a%d' % i)
-          yield i
-
-    watched = Watched()
-    watched.register_observer(lambda v, key: self.observer(v, key=key))
-    for _ in watched:
-      pass
-
-    self.assertEquals(3, self.observed_count)
-    self.assertEquals(8, self.observed_sum)
-    self.assertEquals(['a1', 'a3', 'a4'], sorted(self.observed_keys))
-
-
-if __name__ == '__main__':
-  logging.getLogger().setLevel(logging.INFO)
-  unittest.main()

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b14dfadd/sdks/python/google/cloud/dataflow/coders/slow_coders_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/google/cloud/dataflow/coders/slow_coders_test.py b/sdks/python/google/cloud/dataflow/coders/slow_coders_test.py
deleted file mode 100644
index 8cb23ae..0000000
--- a/sdks/python/google/cloud/dataflow/coders/slow_coders_test.py
+++ /dev/null
@@ -1,36 +0,0 @@
-# Copyright 2016 Google Inc. All Rights Reserved.
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-#      http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-"""Unit tests for uncompiled implementation of coder impls."""
-
-import logging
-import unittest
-
-
-# Run all the standard coder test cases.
-from google.cloud.dataflow.coders.coders_test_common import *
-
-
-class SlowCoders(unittest.TestCase):
-
-  def test_using_slow_impl(self):
-    # Assert that we are not using the compiled implementation.
-    with self.assertRaises(ImportError):
-      # pylint: disable=g-import-not-at-top
-      # pylint: disable=unused-variable
-      import google.cloud.dataflow.coders.stream
-
-if __name__ == '__main__':
-  logging.getLogger().setLevel(logging.INFO)
-  unittest.main()

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b14dfadd/sdks/python/google/cloud/dataflow/coders/slow_stream.py
----------------------------------------------------------------------
diff --git a/sdks/python/google/cloud/dataflow/coders/slow_stream.py b/sdks/python/google/cloud/dataflow/coders/slow_stream.py
deleted file mode 100644
index ea09d54..0000000
--- a/sdks/python/google/cloud/dataflow/coders/slow_stream.py
+++ /dev/null
@@ -1,136 +0,0 @@
-# Copyright 2016 Google Inc. All Rights Reserved.
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-#      http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-"""A pure Python implementation of stream.pyx."""
-
-import struct
-
-
-class OutputStream(object):
-  """A pure Python implementation of stream.OutputStream."""
-
-  def __init__(self):
-    self.data = []
-
-  def write(self, b, nested=False):
-    assert isinstance(b, str)
-    if nested:
-      self.write_var_int64(len(b))
-    self.data.append(b)
-
-  def write_byte(self, val):
-    self.data.append(chr(val))
-
-  def write_var_int64(self, v):
-    if v < 0:
-      v += 1 << 64
-      if v <= 0:
-        raise ValueError('Value too large (negative).')
-    while True:
-      bits = v & 0x7F
-      v >>= 7
-      if v:
-        bits |= 0x80
-      self.write_byte(bits)
-      if not v:
-        break
-
-  def write_bigendian_int64(self, v):
-    self.write(struct.pack('>q', v))
-
-  def write_bigendian_int32(self, v):
-    self.write(struct.pack('>i', v))
-
-  def write_bigendian_double(self, v):
-    self.write(struct.pack('>d', v))
-
-  def get(self):
-    return ''.join(self.data)
-
-
-class ByteCountingOutputStream(OutputStream):
-  """A pure Python implementation of stream.ByteCountingOutputStream."""
-
-  def __init__(self):
-    # Note that we don't actually use any of the data initialized by our super.
-    super(ByteCountingOutputStream, self).__init__()
-    self.count = 0
-
-  def write(self, byte_array, nested=False):
-    blen = len(byte_array)
-    if nested:
-      self.write_var_int64(blen)
-    self.count += blen
-
-  def write_byte(self, _):
-    self.count += 1
-
-  def get_count(self):
-    return self.count
-
-  def get(self):
-    raise NotImplementedError
-
-  def __str__(self):
-    return '<%s %s>' % (self.__class__.__name__, self.count)
-
-
-class InputStream(object):
-  """A pure Python implementation of stream.InputStream."""
-
-  def __init__(self, data):
-    self.data = data
-    self.pos = 0
-
-  def size(self):
-    return len(self.data) - self.pos
-
-  def read(self, size):
-    self.pos += size
-    return self.data[self.pos - size : self.pos]
-
-  def read_all(self, nested):
-    return self.read(self.read_var_int64() if nested else self.size())
-
-  def read_byte(self):
-    self.pos += 1
-    return ord(self.data[self.pos - 1])
-
-  def read_var_int64(self):
-    shift = 0
-    result = 0
-    while True:
-      byte = self.read_byte()
-      if byte < 0:
-        raise RuntimeError('VarLong not terminated.')
-
-      bits = byte & 0x7F
-      if shift >= 64 or (shift >= 63 and bits > 1):
-        raise RuntimeError('VarLong too long.')
-      result |= bits << shift
-      shift += 7
-      if not byte & 0x80:
-        break
-    if result >= 1 << 63:
-      result -= 1 << 64
-    return result
-
-  def read_bigendian_int64(self):
-    return struct.unpack('>q', self.read(8))[0]
-
-  def read_bigendian_int32(self):
-    return struct.unpack('>i', self.read(4))[0]
-
-  def read_bigendian_double(self):
-    return struct.unpack('>d', self.read(8))[0]

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b14dfadd/sdks/python/google/cloud/dataflow/coders/stream.pxd
----------------------------------------------------------------------
diff --git a/sdks/python/google/cloud/dataflow/coders/stream.pxd b/sdks/python/google/cloud/dataflow/coders/stream.pxd
deleted file mode 100644
index 3da7324..0000000
--- a/sdks/python/google/cloud/dataflow/coders/stream.pxd
+++ /dev/null
@@ -1,58 +0,0 @@
-# Copyright 2016 Google Inc. All Rights Reserved.
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-#      http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-cimport libc.stdint
-
-
-cdef class OutputStream(object):
-  cdef char* data
-  cdef size_t size
-  cdef size_t pos
-
-  cpdef write(self, bytes b, bint nested=*)
-  cpdef write_byte(self, unsigned char val)
-  cpdef write_var_int64(self, libc.stdint.int64_t v)
-  cpdef write_bigendian_int64(self, libc.stdint.int64_t signed_v)
-  cpdef write_bigendian_int32(self, libc.stdint.int32_t signed_v)
-  cpdef write_bigendian_double(self, double d)
-
-  cpdef bytes get(self)
-
-  cdef extend(self, size_t missing)
-
-
-cdef class ByteCountingOutputStream(OutputStream):
-  cdef size_t count
-
-  cpdef write(self, bytes b, bint nested=*)
-  cpdef write_byte(self, unsigned char val)
-  cpdef write_bigendian_int64(self, libc.stdint.int64_t val)
-  cpdef write_bigendian_int32(self, libc.stdint.int32_t val)
-  cpdef size_t get_count(self)
-  cpdef bytes get(self)
-
-
-cdef class InputStream(object):
-  cdef size_t pos
-  cdef bytes all
-  cdef char* allc
-
-  cpdef size_t size(self) except? -1
-  cpdef bytes read(self, size_t len)
-  cpdef long read_byte(self) except? -1
-  cpdef libc.stdint.int64_t read_var_int64(self) except? -1
-  cpdef libc.stdint.int64_t read_bigendian_int64(self) except? -1
-  cpdef libc.stdint.int32_t read_bigendian_int32(self) except? -1
-  cpdef double read_bigendian_double(self) except? -1
-  cpdef bytes read_all(self, bint nested=*)

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b14dfadd/sdks/python/google/cloud/dataflow/coders/stream.pyx
----------------------------------------------------------------------
diff --git a/sdks/python/google/cloud/dataflow/coders/stream.pyx b/sdks/python/google/cloud/dataflow/coders/stream.pyx
deleted file mode 100644
index 6df186a..0000000
--- a/sdks/python/google/cloud/dataflow/coders/stream.pyx
+++ /dev/null
@@ -1,201 +0,0 @@
-# Copyright 2016 Google Inc. All Rights Reserved.
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-#      http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-cimport libc.stdlib
-cimport libc.string
-
-
-cdef class OutputStream(object):
-  """An output string stream implementation supporting write() and get()."""
-
-  #TODO(robertwb): Consider using raw C++ streams.
-
-  def __cinit__(self):
-    self.size = 1024
-    self.pos = 0
-    self.data = <char*>libc.stdlib.malloc(self.size)
-    assert self.data, "OutputStream malloc failed."
-
-  def __dealloc__(self):
-    if self.data:
-      libc.stdlib.free(self.data)
-
-  cpdef write(self, bytes b, bint nested=False):
-    cdef size_t blen = len(b)
-    if nested:
-      self.write_var_int64(blen)
-    if self.size < self.pos + blen:
-      self.extend(blen)
-    libc.string.memcpy(self.data + self.pos, <char*>b, blen)
-    self.pos += blen
-
-  cpdef write_byte(self, unsigned char val):
-    if  self.size < self.pos + 1:
-      self.extend(1)
-    self.data[self.pos] = val
-    self.pos += 1
-
-  cpdef write_var_int64(self, libc.stdint.int64_t signed_v):
-    """Encode a long using variable-length encoding to a stream."""
-    cdef libc.stdint.uint64_t v = signed_v
-    cdef long bits
-    while True:
-      bits = v & 0x7F
-      v >>= 7
-      if v:
-        bits |= 0x80
-      self.write_byte(bits)
-      if not v:
-        break
-
-  cpdef write_bigendian_int64(self, libc.stdint.int64_t signed_v):
-    cdef libc.stdint.uint64_t v = signed_v
-    if  self.size < self.pos + 8:
-      self.extend(8)
-    self.data[self.pos    ] = <unsigned char>(v >> 56)
-    self.data[self.pos + 1] = <unsigned char>(v >> 48)
-    self.data[self.pos + 2] = <unsigned char>(v >> 40)
-    self.data[self.pos + 3] = <unsigned char>(v >> 32)
-    self.data[self.pos + 4] = <unsigned char>(v >> 24)
-    self.data[self.pos + 5] = <unsigned char>(v >> 16)
-    self.data[self.pos + 6] = <unsigned char>(v >>  8)
-    self.data[self.pos + 7] = <unsigned char>(v      )
-    self.pos += 8
-
-  cpdef write_bigendian_int32(self, libc.stdint.int32_t signed_v):
-    cdef libc.stdint.uint32_t v = signed_v
-    if  self.size < self.pos + 4:
-      self.extend(4)
-    self.data[self.pos    ] = <unsigned char>(v >> 24)
-    self.data[self.pos + 1] = <unsigned char>(v >> 16)
-    self.data[self.pos + 2] = <unsigned char>(v >>  8)
-    self.data[self.pos + 3] = <unsigned char>(v      )
-    self.pos += 4
-
-  cpdef write_bigendian_double(self, double d):
-    self.write_bigendian_int64((<libc.stdint.int64_t*><char*>&d)[0])
-
-  cpdef bytes get(self):
-    return self.data[:self.pos]
-
-  cdef extend(self, size_t missing):
-    while missing > self.size - self.pos:
-      self.size *= 2
-    self.data = <char*>libc.stdlib.realloc(self.data, self.size)
-    assert self.data, "OutputStream realloc failed."
-
-
-cdef class ByteCountingOutputStream(OutputStream):
-  """An output string stream implementation that only counts the bytes.
-
-  This implementation counts the number of bytes it "writes" but
-  doesn't actually write them anyway.  Thus it has write() but not
-  get().  get_count() returns how many bytes were written.
-
-  This is useful for sizing an encoding.
-  """
-
-  def __cinit__(self):
-    self.count = 0
-
-  cpdef write(self, bytes b, bint nested=False):
-    cdef size_t blen = len(b)
-    if nested:
-      self.write_var_int64(blen)
-    self.count += blen
-
-  cpdef write_byte(self, unsigned char _):
-    self.count += 1
-
-  cpdef write_bigendian_int64(self, libc.stdint.int64_t _):
-    self.count += 8
-
-  cpdef write_bigendian_int32(self, libc.stdint.int32_t _):
-    self.count += 4
-
-  cpdef size_t get_count(self):
-    return self.count
-
-  cpdef bytes get(self):
-    raise NotImplementedError
-
-  def __str__(self):
-    return '<%s %s>' % (self.__class__.__name__, self.count)
-
-
-cdef class InputStream(object):
-  """An input string stream implementation supporting read() and size()."""
-
-  def __init__(self, all):
-    self.allc = self.all = all
-
-  cpdef bytes read(self, size_t size):
-    self.pos += size
-    return self.allc[self.pos - size : self.pos]
-
-  cpdef long read_byte(self) except? -1:
-    self.pos += 1
-    # Note: the C++ compiler on Dataflow workers treats the char array below as
-    # a signed char.  This causes incorrect coder behavior unless explicitly
-    # cast to an unsigned char here.
-    return <long>(<unsigned char> self.allc[self.pos - 1])
-
-  cpdef size_t size(self) except? -1:
-    return len(self.all) - self.pos
-
-  cpdef bytes read_all(self, bint nested=False):
-    return self.read(self.read_var_int64() if nested else self.size())
-
-  cpdef libc.stdint.int64_t read_var_int64(self) except? -1:
-    """Decode a variable-length encoded long from a stream."""
-    cdef long byte
-    cdef long bits
-    cdef long shift = 0
-    cdef libc.stdint.int64_t result = 0
-    while True:
-      byte = self.read_byte()
-      if byte < 0:
-        raise RuntimeError('VarInt not terminated.')
-
-      bits = byte & 0x7F
-      if (shift >= sizeof(long) * 8 or
-          (shift >= (sizeof(long) * 8 - 1) and bits > 1)):
-        raise RuntimeError('VarLong too long.')
-      result |= bits << shift
-      shift += 7
-      if not (byte & 0x80):
-        break
-    return result
-
-  cpdef libc.stdint.int64_t read_bigendian_int64(self) except? -1:
-    self.pos += 8
-    return (<unsigned char>self.allc[self.pos - 1]
-      | <libc.stdint.uint64_t><unsigned char>self.allc[self.pos - 2] <<  8
-      | <libc.stdint.uint64_t><unsigned char>self.allc[self.pos - 3] << 16
-      | <libc.stdint.uint64_t><unsigned char>self.allc[self.pos - 4] << 24
-      | <libc.stdint.uint64_t><unsigned char>self.allc[self.pos - 5] << 32
-      | <libc.stdint.uint64_t><unsigned char>self.allc[self.pos - 6] << 40
-      | <libc.stdint.uint64_t><unsigned char>self.allc[self.pos - 7] << 48
-      | <libc.stdint.uint64_t><unsigned char>self.allc[self.pos - 8] << 56)
-
-  cpdef libc.stdint.int32_t read_bigendian_int32(self) except? -1:
-    self.pos += 4
-    return (<unsigned char>self.allc[self.pos - 1]
-      | <libc.stdint.uint32_t><unsigned char>self.allc[self.pos - 2] <<  8
-      | <libc.stdint.uint32_t><unsigned char>self.allc[self.pos - 3] << 16
-      | <libc.stdint.uint32_t><unsigned char>self.allc[self.pos - 4] << 24)
-
-  cpdef double read_bigendian_double(self) except? -1:
-    cdef libc.stdint.int64_t as_long = self.read_bigendian_int64()
-    return (<double*><char*>&as_long)[0]


Mime
View raw message