Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 6C7CB200C1C for ; Wed, 15 Feb 2017 10:29:17 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id 6B3D7160B46; Wed, 15 Feb 2017 09:29:17 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 38783160B70 for ; Wed, 15 Feb 2017 10:29:16 +0100 (CET) Received: (qmail 87064 invoked by uid 500); 15 Feb 2017 09:29:15 -0000 Mailing-List: contact commits-help@beam.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@beam.apache.org Delivered-To: mailing list commits@beam.apache.org Received: (qmail 86833 invoked by uid 99); 15 Feb 2017 09:29:15 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 15 Feb 2017 09:29:15 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 3BE9CE038F; Wed, 15 Feb 2017 09:29:15 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: amitsela@apache.org To: commits@beam.apache.org Date: Wed, 15 Feb 2017 09:29:17 -0000 Message-Id: <14bf6f7dc41b4e88b1a5b520afdd9b0d@git.apache.org> In-Reply-To: <421d88ebeee4412395102fe43e0a3a82@git.apache.org> References: <421d88ebeee4412395102fe43e0a3a82@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [3/7] beam git commit: Remove duplicate classes from spark runner marking sdk classes Serializable instead. archived-at: Wed, 15 Feb 2017 09:29:17 -0000 Remove duplicate classes from spark runner marking sdk classes Serializable instead. Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/22865780 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/22865780 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/22865780 Branch: refs/heads/master Commit: 228657808f76e38f0b16767020d6d7e149d5dcdf Parents: 31624fe Author: Aviem Zur Authored: Thu Jan 19 13:58:14 2017 +0200 Committer: Sela Committed: Wed Feb 15 11:10:48 2017 +0200 ---------------------------------------------------------------------- .../runners/spark/metrics/MetricAggregator.java | 113 ---------- .../spark/metrics/MetricsAccumulatorParam.java | 4 +- .../spark/metrics/SparkMetricResults.java | 28 ++- .../spark/metrics/SparkMetricsContainer.java | 205 +++---------------- .../beam/sdk/metrics/DistributionData.java | 3 +- .../org/apache/beam/sdk/metrics/MetricKey.java | 3 +- .../apache/beam/sdk/metrics/MetricUpdates.java | 3 +- 7 files changed, 46 insertions(+), 313 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/22865780/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/MetricAggregator.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/MetricAggregator.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/MetricAggregator.java deleted file mode 100644 index 79e49ce..0000000 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/MetricAggregator.java +++ /dev/null @@ -1,113 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.beam.runners.spark.metrics; - -import java.io.Serializable; -import org.apache.beam.sdk.metrics.DistributionData; -import org.apache.beam.sdk.metrics.MetricKey; - - -/** - * Metric values wrapper which adds aggregation methods. - * @param Metric value type. - */ -abstract class MetricAggregator implements Serializable { - private final MetricKey key; - protected ValueT value; - - private MetricAggregator(MetricKey key, ValueT value) { - this.key = key; - this.value = value; - } - - public MetricKey getKey() { - return key; - } - - public ValueT getValue() { - return value; - } - - @SuppressWarnings("unused") - abstract MetricAggregator updated(ValueT update); - - static class CounterAggregator extends MetricAggregator { - CounterAggregator(MetricKey key, Long value) { - super(key, value); - } - - @Override - CounterAggregator updated(Long counterUpdate) { - value = value + counterUpdate; - return this; - } - } - - static class DistributionAggregator extends MetricAggregator { - DistributionAggregator(MetricKey key, DistributionData value) { - super(key, value); - } - - @Override - DistributionAggregator updated(DistributionData distributionUpdate) { - this.value = new SparkDistributionData(this.value.combine(distributionUpdate)); - return this; - } - } - - static class SparkDistributionData extends DistributionData implements Serializable { - private final long sum; - private final long count; - private final long min; - private final long max; - - SparkDistributionData(DistributionData original) { - this.sum = original.sum(); - this.count = original.count(); - this.min = original.min(); - this.max = original.max(); - } - - @Override - public long sum() { - return sum; - } - - @Override - public long count() { - return count; - } - - @Override - public long min() { - return min; - } - - @Override - public long max() { - return max; - } - } - - static MetricAggregator updated(MetricAggregator metricAggregator, Object updateValue) { - //noinspection unchecked - return metricAggregator.updated((T) updateValue); - } -} - http://git-wip-us.apache.org/repos/asf/beam/blob/22865780/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/MetricsAccumulatorParam.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/MetricsAccumulatorParam.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/MetricsAccumulatorParam.java index cd54097..9948c81 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/MetricsAccumulatorParam.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/MetricsAccumulatorParam.java @@ -27,12 +27,12 @@ import org.apache.spark.AccumulatorParam; class MetricsAccumulatorParam implements AccumulatorParam { @Override public SparkMetricsContainer addAccumulator(SparkMetricsContainer c1, SparkMetricsContainer c2) { - return c1.merge(c2); + return c1.update(c2); } @Override public SparkMetricsContainer addInPlace(SparkMetricsContainer c1, SparkMetricsContainer c2) { - return c1.merge(c2); + return c1.update(c2); } @Override http://git-wip-us.apache.org/repos/asf/beam/blob/22865780/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/SparkMetricResults.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/SparkMetricResults.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/SparkMetricResults.java index 64b92b7..330b060 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/SparkMetricResults.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/SparkMetricResults.java @@ -23,8 +23,7 @@ import com.google.common.base.Objects; import com.google.common.base.Predicate; import com.google.common.collect.FluentIterable; import java.util.Set; -import org.apache.beam.runners.spark.metrics.MetricAggregator.CounterAggregator; -import org.apache.beam.runners.spark.metrics.MetricAggregator.DistributionAggregator; +import org.apache.beam.sdk.metrics.DistributionData; import org.apache.beam.sdk.metrics.DistributionResult; import org.apache.beam.sdk.metrics.MetricKey; import org.apache.beam.sdk.metrics.MetricName; @@ -32,6 +31,7 @@ import org.apache.beam.sdk.metrics.MetricNameFilter; import org.apache.beam.sdk.metrics.MetricQueryResults; import org.apache.beam.sdk.metrics.MetricResult; import org.apache.beam.sdk.metrics.MetricResults; +import org.apache.beam.sdk.metrics.MetricUpdates.MetricUpdate; import org.apache.beam.sdk.metrics.MetricsFilter; @@ -72,10 +72,10 @@ public class SparkMetricResults extends MetricResults { .toList(); } - private Predicate> matchesFilter(final MetricsFilter filter) { - return new Predicate>() { + private Predicate> matchesFilter(final MetricsFilter filter) { + return new Predicate>() { @Override - public boolean apply(MetricAggregator metricResult) { + public boolean apply(MetricUpdate metricResult) { return matches(filter, metricResult.getKey()); } }; @@ -116,32 +116,30 @@ public class SparkMetricResults extends MetricResults { } } - private static final Function> + private static final Function, MetricResult> TO_DISTRIBUTION_RESULT = - new Function>() { + new Function, MetricResult>() { @Override - public MetricResult - apply(DistributionAggregator metricResult) { + public MetricResult apply(MetricUpdate metricResult) { if (metricResult != null) { MetricKey key = metricResult.getKey(); return new SparkMetricResult<>(key.metricName(), key.stepName(), - metricResult.getValue().extractResult()); + metricResult.getUpdate().extractResult()); } else { return null; } } }; - private static final Function> + private static final Function, MetricResult> TO_COUNTER_RESULT = - new Function>() { + new Function, MetricResult>() { @Override - public MetricResult - apply(CounterAggregator metricResult) { + public MetricResult apply(MetricUpdate metricResult) { if (metricResult != null) { MetricKey key = metricResult.getKey(); return new SparkMetricResult<>(key.metricName(), key.stepName(), - metricResult.getValue()); + metricResult.getUpdate()); } else { return null; } http://git-wip-us.apache.org/repos/asf/beam/blob/22865780/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/SparkMetricsContainer.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/SparkMetricsContainer.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/SparkMetricsContainer.java index 234cb81..9d5bb47 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/SparkMetricsContainer.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/SparkMetricsContainer.java @@ -18,13 +18,9 @@ package org.apache.beam.runners.spark.metrics; -import com.google.common.base.Function; -import com.google.common.base.Predicate; import com.google.common.cache.CacheBuilder; import com.google.common.cache.CacheLoader; import com.google.common.cache.LoadingCache; -import com.google.common.collect.FluentIterable; -import com.google.common.collect.Iterables; import java.io.IOException; import java.io.ObjectOutputStream; import java.io.Serializable; @@ -32,12 +28,8 @@ import java.util.Collection; import java.util.HashMap; import java.util.Map; import java.util.concurrent.ExecutionException; -import org.apache.beam.runners.spark.metrics.MetricAggregator.CounterAggregator; -import org.apache.beam.runners.spark.metrics.MetricAggregator.DistributionAggregator; -import org.apache.beam.runners.spark.metrics.MetricAggregator.SparkDistributionData; import org.apache.beam.sdk.metrics.DistributionData; import org.apache.beam.sdk.metrics.MetricKey; -import org.apache.beam.sdk.metrics.MetricName; import org.apache.beam.sdk.metrics.MetricUpdates; import org.apache.beam.sdk.metrics.MetricUpdates.MetricUpdate; import org.apache.beam.sdk.metrics.MetricsContainer; @@ -53,7 +45,8 @@ public class SparkMetricsContainer implements Serializable { private transient volatile LoadingCache metricsContainers; - private final Map> metrics = new HashMap<>(); + private final Map> counters = new HashMap<>(); + private final Map> distributions = new HashMap<>(); public MetricsContainer getContainer(String stepName) { if (metricsContainers == null) { @@ -72,69 +65,24 @@ public class SparkMetricsContainer implements Serializable { } } - static Collection getCounters() { - return - FluentIterable - .from(getInstance().metrics.values()) - .filter(IS_COUNTER) - .transform(TO_COUNTER) - .toList(); + static Collection> getCounters() { + return getInstance().counters.values(); } - private static final Predicate> IS_COUNTER = - new Predicate>() { - @Override - public boolean apply(MetricAggregator input) { - return (input instanceof CounterAggregator); - } - }; - - private static final Function, CounterAggregator> TO_COUNTER = - new Function, - CounterAggregator>() { - @Override - public CounterAggregator apply(MetricAggregator metricAggregator) { - return (CounterAggregator) metricAggregator; - } - }; - - static Collection getDistributions() { - return - FluentIterable - .from(getInstance().metrics.values()) - .filter(IS_DISTRIBUTION) - .transform(TO_DISTRIBUTION) - .toList(); + static Collection> getDistributions() { + return getInstance().distributions.values(); } - private static final Predicate> IS_DISTRIBUTION = - new Predicate>() { - @Override - public boolean apply(MetricAggregator input) { - return (input instanceof DistributionAggregator); - } - }; - - private static final Function, DistributionAggregator> TO_DISTRIBUTION = - new Function, DistributionAggregator>() { - @Override - public DistributionAggregator apply(MetricAggregator metricAggregator) { - return (DistributionAggregator) metricAggregator; - } - }; - - SparkMetricsContainer merge(SparkMetricsContainer other) { - return this.updated(other.getAggregators()); + SparkMetricsContainer update(SparkMetricsContainer other) { + this.updateCounters(other.counters.values()); + this.updateDistributions(other.distributions.values()); + return this; } private static SparkMetricsContainer getInstance() { return MetricsAccumulator.getInstance().value(); } - private Collection> getAggregators() { - return metrics.values(); - } - private void writeObject(ObjectOutputStream out) throws IOException { // Since MetricsContainer instances are not serializable, materialize a serializable map of // MetricsAggregators relating to the same metrics. This is done here, when Spark serializes @@ -148,44 +96,28 @@ public class SparkMetricsContainer implements Serializable { if (metricsContainers != null) { for (MetricsContainer container : metricsContainers.asMap().values()) { MetricUpdates cumulative = container.getCumulative(); - updated(Iterables.transform(cumulative.counterUpdates(), TO_COUNTER_AGGREGATOR)); - updated(Iterables.transform(cumulative.distributionUpdates(), TO_DISTRIBUTION_AGGREGATOR)); + this.updateCounters(cumulative.counterUpdates()); + this.updateDistributions(cumulative.distributionUpdates()); } } } - private static final Function, MetricAggregator> - TO_COUNTER_AGGREGATOR = new Function, MetricAggregator>() { - @SuppressWarnings("ConstantConditions") - @Override - public CounterAggregator - apply(MetricUpdate update) { - return update != null ? new CounterAggregator(new SparkMetricKey(update.getKey()), - update.getUpdate()) : null; + private void updateCounters(Iterable> updates) { + for (MetricUpdate update : updates) { + MetricKey key = update.getKey(); + MetricUpdate current = counters.get(key); + counters.put(key, current != null + ? MetricUpdate.create(key, current.getUpdate() + update.getUpdate()) : update); } - }; - - private static final Function, MetricAggregator> - TO_DISTRIBUTION_AGGREGATOR = - new Function, MetricAggregator>() { - @SuppressWarnings("ConstantConditions") - @Override - public DistributionAggregator - apply(MetricUpdate update) { - return update != null ? new DistributionAggregator(new SparkMetricKey(update.getKey()), - new SparkDistributionData(update.getUpdate())) : null; - } - }; + } - private SparkMetricsContainer updated(Iterable> updates) { - for (MetricAggregator update : updates) { + private void updateDistributions(Iterable> updates) { + for (MetricUpdate update : updates) { MetricKey key = update.getKey(); - MetricAggregator current = metrics.get(key); - Object updateValue = update.getValue(); - metrics.put(new SparkMetricKey(key), - current != null ? MetricAggregator.updated(current, updateValue) : update); + MetricUpdate current = distributions.get(key); + distributions.put(key, current != null + ? MetricUpdate.create(key, current.getUpdate().combine(update.getUpdate())) : update); } - return this; } private static class MetricsContainerCacheLoader extends CacheLoader { @@ -196,97 +128,10 @@ public class SparkMetricsContainer implements Serializable { } } - private static class SparkMetricKey extends MetricKey implements Serializable { - private final String stepName; - private final MetricName metricName; - - SparkMetricKey(MetricKey original) { - this.stepName = original.stepName(); - MetricName metricName = original.metricName(); - this.metricName = new SparkMetricName(metricName.namespace(), metricName.name()); - } - - @Override - public String stepName() { - return stepName; - } - - @Override - public MetricName metricName() { - return metricName; - } - - @Override - public boolean equals(Object o) { - if (o == this) { - return true; - } - if (o instanceof MetricKey) { - MetricKey that = (MetricKey) o; - return (this.stepName.equals(that.stepName())) - && (this.metricName.equals(that.metricName())); - } - return false; - } - - @Override - public int hashCode() { - int h = 1; - h *= 1000003; - h ^= stepName.hashCode(); - h *= 1000003; - h ^= metricName.hashCode(); - return h; - } - } - - private static class SparkMetricName extends MetricName implements Serializable { - private final String namespace; - private final String name; - - SparkMetricName(String namespace, String name) { - this.namespace = namespace; - this.name = name; - } - - @Override - public String namespace() { - return namespace; - } - - @Override - public String name() { - return name; - } - - @Override - public boolean equals(Object o) { - if (o == this) { - return true; - } - if (o instanceof MetricName) { - MetricName that = (MetricName) o; - return (this.namespace.equals(that.namespace())) - && (this.name.equals(that.name())); - } - return false; - } - - @Override - public int hashCode() { - int h = 1; - h *= 1000003; - h ^= namespace.hashCode(); - h *= 1000003; - h ^= name.hashCode(); - return h; - } - } - @Override public String toString() { StringBuilder sb = new StringBuilder(); - for (Map.Entry metric : new SparkBeamMetric().renderAll().entrySet()) { + for (Map.Entry metric : new SparkBeamMetric().renderAll().entrySet()) { sb.append(metric.getKey()).append(": ").append(metric.getValue()).append(" "); } return sb.toString(); http://git-wip-us.apache.org/repos/asf/beam/blob/22865780/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/DistributionData.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/DistributionData.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/DistributionData.java index 59c7fbd..8068e1b 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/DistributionData.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/DistributionData.java @@ -18,6 +18,7 @@ package org.apache.beam.sdk.metrics; import com.google.auto.value.AutoValue; +import java.io.Serializable; /** * Data describing the the distribution. This should retain enough detail that it can be combined @@ -28,7 +29,7 @@ import com.google.auto.value.AutoValue; * the approximate value of those quantiles. */ @AutoValue -public abstract class DistributionData { +public abstract class DistributionData implements Serializable { public abstract long sum(); public abstract long count(); http://git-wip-us.apache.org/repos/asf/beam/blob/22865780/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricKey.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricKey.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricKey.java index bfa4df5..8706853 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricKey.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricKey.java @@ -18,6 +18,7 @@ package org.apache.beam.sdk.metrics; import com.google.auto.value.AutoValue; +import java.io.Serializable; import org.apache.beam.sdk.annotations.Experimental; import org.apache.beam.sdk.annotations.Experimental.Kind; @@ -26,7 +27,7 @@ import org.apache.beam.sdk.annotations.Experimental.Kind; */ @Experimental(Kind.METRICS) @AutoValue -public abstract class MetricKey { +public abstract class MetricKey implements Serializable { /** The step name that is associated with this metric. */ public abstract String stepName(); http://git-wip-us.apache.org/repos/asf/beam/blob/22865780/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricUpdates.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricUpdates.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricUpdates.java index e84dc66..56466d8 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricUpdates.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricUpdates.java @@ -19,6 +19,7 @@ package org.apache.beam.sdk.metrics; import com.google.auto.value.AutoValue; import com.google.common.collect.Iterables; +import java.io.Serializable; import java.util.Collections; import org.apache.beam.sdk.annotations.Experimental; import org.apache.beam.sdk.annotations.Experimental.Kind; @@ -39,7 +40,7 @@ public abstract class MetricUpdates { * @param The type of value representing the update. */ @AutoValue - public abstract static class MetricUpdate { + public abstract static class MetricUpdate implements Serializable { /** The key being updated. */ public abstract MetricKey getKey();