Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id AF713200CAD for ; Tue, 13 Jun 2017 19:11:44 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id AE240160BC5; Tue, 13 Jun 2017 17:11:44 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 091DE160BDC for ; Tue, 13 Jun 2017 19:11:43 +0200 (CEST) Received: (qmail 69058 invoked by uid 500); 13 Jun 2017 17:11:43 -0000 Mailing-List: contact commits-help@beam.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@beam.apache.org Delivered-To: mailing list commits@beam.apache.org Received: (qmail 69045 invoked by uid 99); 13 Jun 2017 17:11:43 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 13 Jun 2017 17:11:43 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id E5EC5DFF9D; Tue, 13 Jun 2017 17:11:42 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: altay@apache.org To: commits@beam.apache.org Date: Tue, 13 Jun 2017 17:11:42 -0000 Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: [1/2] beam git commit: Fix WindowValueCoder for large timestamps archived-at: Tue, 13 Jun 2017 17:11:44 -0000 Repository: beam Updated Branches: refs/heads/master e906fe9c3 -> c33e9b446 Fix WindowValueCoder for large timestamps Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/ee728f1b Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/ee728f1b Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/ee728f1b Branch: refs/heads/master Commit: ee728f1b2f617dac8e5cd729cacf1a46911021e0 Parents: e906fe9 Author: Vikas Kedigehalli Authored: Mon Jun 12 23:11:22 2017 -0700 Committer: Ahmet Altay Committed: Tue Jun 13 10:11:20 2017 -0700 ---------------------------------------------------------------------- sdks/python/apache_beam/coders/coder_impl.py | 4 ++++ sdks/python/apache_beam/coders/coders_test_common.py | 8 ++++++++ 2 files changed, 12 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/ee728f1b/sdks/python/apache_beam/coders/coder_impl.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/coders/coder_impl.py b/sdks/python/apache_beam/coders/coder_impl.py index 10298bf..2670250 100644 --- a/sdks/python/apache_beam/coders/coder_impl.py +++ b/sdks/python/apache_beam/coders/coder_impl.py @@ -710,6 +710,10 @@ class WindowedValueCoderImpl(StreamCoderImpl): timestamp = MAX_TIMESTAMP.micros else: timestamp *= 1000 + if timestamp > MAX_TIMESTAMP.micros: + timestamp = MAX_TIMESTAMP.micros + if timestamp < MIN_TIMESTAMP.micros: + timestamp = MIN_TIMESTAMP.micros windows = self._windows_coder.decode_from_stream(in_stream, True) # Read PaneInfo encoded byte. http://git-wip-us.apache.org/repos/asf/beam/blob/ee728f1b/sdks/python/apache_beam/coders/coders_test_common.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/coders/coders_test_common.py b/sdks/python/apache_beam/coders/coders_test_common.py index c9b67b3..577c53a 100644 --- a/sdks/python/apache_beam/coders/coders_test_common.py +++ b/sdks/python/apache_beam/coders/coders_test_common.py @@ -23,6 +23,8 @@ import unittest import dill +from apache_beam.transforms.window import GlobalWindow +from apache_beam.utils.timestamp import MIN_TIMESTAMP import observable from apache_beam.transforms import window from apache_beam.utils import timestamp @@ -287,6 +289,12 @@ class CodersTest(unittest.TestCase): # Test binary representation self.assertEqual('\x7f\xdf;dZ\x1c\xac\t\x00\x00\x00\x01\x0f\x01', coder.encode(window.GlobalWindows.windowed_value(1))) + + # Test decoding large timestamp + self.assertEqual( + coder.decode('\x7f\xdf;dZ\x1c\xac\x08\x00\x00\x00\x01\x0f\x00'), + windowed_value.create(0, MIN_TIMESTAMP.micros, (GlobalWindow(),))) + # Test unnested self.check_coder( coders.WindowedValueCoder(coders.VarIntCoder()),