From commits-return-101446-archive-asf-public=cust-asf.ponee.io@beam.apache.org Thu Apr 11 19:13:22 2019 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [207.244.88.153]) by mx-eu-01.ponee.io (Postfix) with SMTP id 8606D18065D for ; Thu, 11 Apr 2019 21:13:22 +0200 (CEST) Received: (qmail 1713 invoked by uid 500); 11 Apr 2019 19:13:21 -0000 Mailing-List: contact commits-help@beam.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@beam.apache.org Delivered-To: mailing list commits@beam.apache.org Received: (qmail 1704 invoked by uid 99); 11 Apr 2019 19:13:21 -0000 Received: from ec2-52-202-80-70.compute-1.amazonaws.com (HELO gitbox.apache.org) (52.202.80.70) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 11 Apr 2019 19:13:21 +0000 Received: by gitbox.apache.org (ASF Mail Server at gitbox.apache.org, from userid 33) id AF36E814B5; Thu, 11 Apr 2019 19:13:21 +0000 (UTC) Date: Thu, 11 Apr 2019 19:13:20 +0000 To: "commits@beam.apache.org" Subject: [beam] branch master updated: [BEAM-4374] Add Beam Distribution Accumulator to use in python's counter factory. MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 8bit Message-ID: <155500999901.31575.2004320763767368663@gitbox.apache.org> From: pabloem@apache.org X-Git-Host: gitbox.apache.org X-Git-Repo: beam X-Git-Refname: refs/heads/master X-Git-Reftype: branch X-Git-Oldrev: cbe4dfbdbe5d0da5152568853ee5e17334dd1b54 X-Git-Newrev: f876166ba02a94ea66954a586810f1f15d36d98e X-Git-Rev: f876166ba02a94ea66954a586810f1f15d36d98e X-Git-NotificationType: ref_changed_plus_diff X-Git-Multimail-Version: 1.5.dev Auto-Submitted: auto-generated This is an automated email from the ASF dual-hosted git repository. pabloem pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git The following commit(s) were added to refs/heads/master by this push: new f876166 [BEAM-4374] Add Beam Distribution Accumulator to use in python's counter factory. new 51a13f0 Merge pull request #8272 from ajamato/mean_byte_count_cy_combiner_only f876166 is described below commit f876166ba02a94ea66954a586810f1f15d36d98e Author: Alex Amato AuthorDate: Wed Mar 13 17:56:17 2019 -0700 [BEAM-4374] Add Beam Distribution Accumulator to use in python's counter factory. --- .../runners/dataflow/internal/apiclient.py | 21 ++++++++--- .../runners/dataflow/internal/apiclient_test.py | 44 +++++++++++++++++++++- .../python/apache_beam/transforms/cy_combiners.pxd | 10 +++++ sdks/python/apache_beam/transforms/cy_combiners.py | 36 ++++++++++++++++++ sdks/python/apache_beam/utils/counters.py | 1 + 5 files changed, 105 insertions(+), 7 deletions(-) diff --git a/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py b/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py index e0a1a56..b0b1325 100644 --- a/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py +++ b/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py @@ -829,8 +829,8 @@ def translate_distribution(distribution_update, metric_update_proto): """Translate metrics DistributionUpdate to dataflow distribution update. Args: - distribution_update: Instance of DistributionData or - DataflowDistributionCounter. + distribution_update: Instance of DistributionData, + DistributionInt64Accumulator or DataflowDistributionCounter. metric_update_proto: Used for report metrics. """ dist_update_proto = dataflow.DistributionUpdate() @@ -838,7 +838,7 @@ def translate_distribution(distribution_update, metric_update_proto): dist_update_proto.max = to_split_int(distribution_update.max) dist_update_proto.count = to_split_int(distribution_update.count) dist_update_proto.sum = to_split_int(distribution_update.sum) - # DatadflowDistributionCounter needs to translate histogram + # DataflowDistributionCounter needs to translate histogram if isinstance(distribution_update, DataflowDistributionCounter): dist_update_proto.histogram = dataflow.Histogram() distribution_update.translate_to_histogram(dist_update_proto.histogram) @@ -969,6 +969,11 @@ def _verify_interpreter_version_is_supported(pipeline_options): # To enable a counter on the service, add it to this dictionary. +# This is required for the legacy python dataflow runner, as portability +# does not communicate to the service via python code, but instead via a +# a runner harness (in C++ or Java). +# TODO(BEAM-7050) : Remove this antipattern, legacy dataflow python +# pipelines will break whenever a new cy_combiner type is used. structured_counter_translations = { cy_combiners.CountCombineFn: ( dataflow.CounterMetadata.KindValueValuesEnum.SUM, @@ -1005,7 +1010,10 @@ structured_counter_translations = { MetricUpdateTranslators.translate_boolean), cy_combiners.DataflowDistributionCounterFn: ( dataflow.CounterMetadata.KindValueValuesEnum.DISTRIBUTION, - translate_distribution) + translate_distribution), + cy_combiners.DistributionInt64Fn: ( + dataflow.CounterMetadata.KindValueValuesEnum.DISTRIBUTION, + translate_distribution), } @@ -1045,5 +1053,8 @@ counter_translations = { MetricUpdateTranslators.translate_boolean), cy_combiners.DataflowDistributionCounterFn: ( dataflow.NameAndKind.KindValueValuesEnum.DISTRIBUTION, - translate_distribution) + translate_distribution), + cy_combiners.DistributionInt64Fn: ( + dataflow.CounterMetadata.KindValueValuesEnum.DISTRIBUTION, + translate_distribution), } diff --git a/sdks/python/apache_beam/runners/dataflow/internal/apiclient_test.py b/sdks/python/apache_beam/runners/dataflow/internal/apiclient_test.py index 77eba7c..2f65716 100644 --- a/sdks/python/apache_beam/runners/dataflow/internal/apiclient_test.py +++ b/sdks/python/apache_beam/runners/dataflow/internal/apiclient_test.py @@ -163,7 +163,24 @@ class UtilTest(unittest.TestCase): self.assertEqual((split_number.lowBits, split_number.highBits), (0, number)) - def test_translate_distribution(self): + def test_translate_distribution_using_accumulator(self): + metric_update = dataflow.CounterUpdate() + accumulator = mock.Mock() + accumulator.min = 1 + accumulator.max = 15 + accumulator.sum = 16 + accumulator.count = 2 + apiclient.translate_distribution(accumulator, metric_update) + self.assertEqual(metric_update.distribution.min.lowBits, + accumulator.min) + self.assertEqual(metric_update.distribution.max.lowBits, + accumulator.max) + self.assertEqual(metric_update.distribution.sum.lowBits, + accumulator.sum) + self.assertEqual(metric_update.distribution.count.lowBits, + accumulator.count) + + def test_translate_distribution_using_distribution_data(self): metric_update = dataflow.CounterUpdate() distribution_update = DistributionData(16, 2, 1, 15) apiclient.translate_distribution(distribution_update, metric_update) @@ -176,7 +193,7 @@ class UtilTest(unittest.TestCase): self.assertEqual(metric_update.distribution.count.lowBits, distribution_update.count) - def test_translate_distribution_counter(self): + def test_translate_distribution_using_dataflow_distribution_counter(self): counter_update = DataflowDistributionCounter() counter_update.add_input(1) counter_update.add_input(3) @@ -215,6 +232,29 @@ class UtilTest(unittest.TestCase): self.assertEqual( metric_update.floatingPointMean.count.lowBits, accumulator.count) + def test_translate_means_using_distribution_accumulator(self): + # This is the special case for MeanByteCount. + # Which is reported over the FnAPI as a beam distribution, + # and to the service as a MetricUpdate IntegerMean. + metric_update = dataflow.CounterUpdate() + accumulator = mock.Mock() + accumulator.min = 7 + accumulator.max = 9 + accumulator.sum = 16 + accumulator.count = 2 + apiclient.MetricUpdateTranslators.translate_scalar_mean_int(accumulator, + metric_update) + self.assertEqual(metric_update.integerMean.sum.lowBits, accumulator.sum) + self.assertEqual(metric_update.integerMean.count.lowBits, accumulator.count) + + accumulator.sum = 16.0 + accumulator.count = 2 + apiclient.MetricUpdateTranslators.translate_scalar_mean_float(accumulator, + metric_update) + self.assertEqual(metric_update.floatingPointMean.sum, accumulator.sum) + self.assertEqual( + metric_update.floatingPointMean.count.lowBits, accumulator.count) + def test_default_ip_configuration(self): pipeline_options = PipelineOptions( ['--temp_location', 'gs://any-location/temp']) diff --git a/sdks/python/apache_beam/transforms/cy_combiners.pxd b/sdks/python/apache_beam/transforms/cy_combiners.pxd index c0593e3..8834c5a 100644 --- a/sdks/python/apache_beam/transforms/cy_combiners.pxd +++ b/sdks/python/apache_beam/transforms/cy_combiners.pxd @@ -56,6 +56,16 @@ cdef class MeanInt64Accumulator(object): cpdef merge(self, accumulators) +cdef class DistributionInt64Accumulator(object): + cdef readonly int64_t sum + cdef readonly int64_t count + cdef readonly int64_t min + cdef readonly int64_t max + cpdef add_input(self, int64_t element) + @cython.locals(accumulator=DistributionInt64Accumulator) + cpdef merge(self, accumulators) + + cdef class SumDoubleAccumulator(object): cdef readonly double value cpdef add_input(self, double element) diff --git a/sdks/python/apache_beam/transforms/cy_combiners.py b/sdks/python/apache_beam/transforms/cy_combiners.py index e239735..139b8a3 100644 --- a/sdks/python/apache_beam/transforms/cy_combiners.py +++ b/sdks/python/apache_beam/transforms/cy_combiners.py @@ -175,6 +175,38 @@ class MeanInt64Accumulator(object): return self.sum // self.count if self.count else _NAN +class DistributionInt64Accumulator(object): + def __init__(self): + self.sum = 0 + self.count = 0 + self.min = INT64_MAX + self.max = INT64_MIN + + def add_input(self, element): + element = int(element) + if not INT64_MIN <= element <= INT64_MAX: + raise OverflowError(element) + self.sum += element + self.count += 1 + self.min = min(self.min, element) + self.max = max(self.max, element) + + def merge(self, accumulators): + for accumulator in accumulators: + self.sum += accumulator.sum + self.count += accumulator.count + self.min = min(self.min, accumulator.min) + self.max = max(self.max, accumulator.max) + + def extract_output(self): + if not INT64_MIN <= self.sum <= INT64_MAX: + self.sum %= 2**64 + if self.sum >= INT64_MAX: + self.sum -= 2**64 + mean = self.sum // self.count if self.count else _NAN + return mean, self.sum, self.count, self.min, self.max + + class CountCombineFn(AccumulatorCombineFn): _accumulator_type = CountAccumulator @@ -195,6 +227,10 @@ class MeanInt64Fn(AccumulatorCombineFn): _accumulator_type = MeanInt64Accumulator +class DistributionInt64Fn(AccumulatorCombineFn): + _accumulator_type = DistributionInt64Accumulator + + _POS_INF = float('inf') _NEG_INF = float('-inf') _NAN = float('nan') diff --git a/sdks/python/apache_beam/utils/counters.py b/sdks/python/apache_beam/utils/counters.py index f924853..dcb5683 100644 --- a/sdks/python/apache_beam/utils/counters.py +++ b/sdks/python/apache_beam/utils/counters.py @@ -134,6 +134,7 @@ class Counter(object): # Handy references to common counters. SUM = cy_combiners.SumInt64Fn() MEAN = cy_combiners.MeanInt64Fn() + BEAM_DISTRIBUTION = cy_combiners.DistributionInt64Fn() # Dataflow Distribution Accumulator Fn. # TODO(BEAM-4045): Generalize distribution counter if necessary.