beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Work logged] (BEAM-5092) Nexmark 10x performance regression
Date Wed, 08 Aug 2018 12:35:00 GMT

     [ https://issues.apache.org/jira/browse/BEAM-5092?focusedWorklogId=132346&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-132346
]

ASF GitHub Bot logged work on BEAM-5092:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 08/Aug/18 12:34
            Start Date: 08/Aug/18 12:34
    Worklog Time Spent: 10m 
      Work Description: reuvenlax closed pull request #6162: [BEAM-5092]  Optimize Row comparison.

URL: https://github.com/apache/beam/pull/6162
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/RowCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/RowCoder.java
index 97252ce7889..47ced3795ee 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/RowCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/RowCoder.java
@@ -17,6 +17,8 @@
  */
 package org.apache.beam.sdk.coders;
 
+import static com.google.common.base.Preconditions.checkArgument;
+
 import com.google.common.collect.ImmutableMap;
 import java.io.IOException;
 import java.io.InputStream;
@@ -29,6 +31,7 @@
 import org.apache.beam.sdk.schemas.Schema;
 import org.apache.beam.sdk.schemas.Schema.FieldType;
 import org.apache.beam.sdk.schemas.Schema.TypeName;
+import org.apache.beam.sdk.util.SerializableUtils;
 import org.apache.beam.sdk.values.Row;
 
 /** A {@link Coder} for {@link Row}. It wraps the {@link Coder} for each element directly.
*/
@@ -68,10 +71,21 @@
   @Nullable private transient Coder<Row> delegateCoder = null;
 
   public static RowCoder of(Schema schema) {
-    return new RowCoder(schema, UUID.randomUUID());
+    UUID id = (schema.getUUID() == null) ? UUID.randomUUID() : schema.getUUID();
+    return new RowCoder(schema, id);
   }
 
   private RowCoder(Schema schema, UUID id) {
+    if (schema.getUUID() != null) {
+      checkArgument(
+          schema.getUUID().equals(id),
+          "Schema has a UUID that doesn't match argument to constructor. %s v.s. %s",
+          schema.getUUID(),
+          id);
+    } else {
+      schema = SerializableUtils.clone(schema);
+      schema.setUUID(id);
+    }
     this.schema = schema;
     this.id = id;
   }
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/Schema.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/Schema.java
index 164b1869d73..43d7a0f813d 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/Schema.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/Schema.java
@@ -30,6 +30,7 @@
 import java.util.List;
 import java.util.Objects;
 import java.util.Set;
+import java.util.UUID;
 import java.util.stream.Collector;
 import java.util.stream.Collectors;
 import javax.annotation.Nullable;
@@ -47,6 +48,9 @@
   // Cache the hashCode, so it doesn't have to be recomputed. Schema objects are immutable,
so this
   // is correct.
   private final int hashCode;
+  // Every SchemaCoder has a UUID. The schemas created with the same UUID are guaranteed
to be
+  // equal, so we can short circuit comparison.
+  @Nullable private UUID uuid = null;
 
   /** Builder class for building {@link Schema} objects. */
   public static class Builder {
@@ -172,6 +176,17 @@ public static Schema of(Field... fields) {
     return Schema.builder().addFields(fields).build();
   }
 
+  /** Set this schema's UUID. All schemas with the same UUID must be guaranteed to be identical.
*/
+  public void setUUID(UUID uuid) {
+    this.uuid = uuid;
+  }
+
+  /** Get this schema's UUID. */
+  @Nullable
+  public UUID getUUID() {
+    return this.uuid;
+  }
+
   /** Returns true if two Schemas have the same fields in the same order. */
   @Override
   public boolean equals(Object o) {
@@ -182,6 +197,10 @@ public boolean equals(Object o) {
       return false;
     }
     Schema other = (Schema) o;
+    // If both schemas have a UUID set, we can simply compare the UUIDs.
+    if (uuid != null && other.uuid != null) {
+      return Objects.equals(uuid, other.uuid);
+    }
     return Objects.equals(fieldIndices, other.fieldIndices)
         && Objects.equals(getFields(), other.getFields());
   }
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/SchemaCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/SchemaCoder.java
index 26b1fa4f8cc..f6ccba980b3 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/SchemaCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/SchemaCoder.java
@@ -32,7 +32,6 @@
 /** {@link SchemaCoder} is used as the coder for types that have schemas registered. */
 @Experimental(Kind.SCHEMAS)
 public class SchemaCoder<T> extends CustomCoder<T> {
-  private final Schema schema;
   private final RowCoder rowCoder;
   private final SerializableFunction<T, Row> toRowFunction;
   private final SerializableFunction<Row, T> fromRowFunction;
@@ -41,7 +40,6 @@ private SchemaCoder(
       Schema schema,
       SerializableFunction<T, Row> toRowFunction,
       SerializableFunction<Row, T> fromRowFunction) {
-    this.schema = schema;
     this.toRowFunction = toRowFunction;
     this.fromRowFunction = fromRowFunction;
     this.rowCoder = RowCoder.of(schema);
@@ -66,7 +64,7 @@ private SchemaCoder(
 
   /** Returns the schema associated with this type. */
   public Schema getSchema() {
-    return schema;
+    return rowCoder.getSchema();
   }
 
   /** Returns the toRow conversion function. */


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 132346)
    Time Spent: 3h  (was: 2h 50m)

> Nexmark 10x performance regression
> ----------------------------------
>
>                 Key: BEAM-5092
>                 URL: https://issues.apache.org/jira/browse/BEAM-5092
>             Project: Beam
>          Issue Type: New Feature
>          Components: sdk-java-core
>            Reporter: Andrew Pilloud
>            Assignee: Reuven Lax
>            Priority: Critical
>          Time Spent: 3h
>  Remaining Estimate: 0h
>
> There looks to be a 10x performance hit on the DirectRunner and Flink nexmark jobs.
It first showed up in this build:
> [https://builds.apache.org/view/A-D/view/Beam/job/beam_PostCommit_Java_Nexmark_Direct/151/changes]
> [https://apache-beam-testing.appspot.com/explore?dashboard=5084698770407424]
> [https://apache-beam-testing.appspot.com/explore?dashboard=5699257587728384]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Mime
View raw message