beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Work logged] (BEAM-4613) Improve performance of SchemaCoder
Date Mon, 25 Jun 2018 22:49:00 GMT

     [ https://issues.apache.org/jira/browse/BEAM-4613?focusedWorklogId=115694&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-115694
]

ASF GitHub Bot logged work on BEAM-4613:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 25/Jun/18 22:48
            Start Date: 25/Jun/18 22:48
    Worklog Time Spent: 10m 
      Work Description: huygaa11 commented on a change in pull request #5723: [BEAM-4613]
Use ByteBuddy to generate a coder class for a specific Schema.
URL: https://github.com/apache/beam/pull/5723#discussion_r197962788
 
 

 ##########
 File path: sdks/java/core/src/main/java/org/apache/beam/sdk/coders/GeneratedRowCoder.java
 ##########
 @@ -0,0 +1,316 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.coders;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.lang.reflect.InvocationTargetException;
+import java.util.BitSet;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import net.bytebuddy.ByteBuddy;
+import net.bytebuddy.description.modifier.FieldManifestation;
+import net.bytebuddy.description.modifier.Ownership;
+import net.bytebuddy.description.modifier.Visibility;
+import net.bytebuddy.description.type.TypeDescription;
+import net.bytebuddy.description.type.TypeDescription.ForLoadedType;
+import net.bytebuddy.dynamic.DynamicType;
+import net.bytebuddy.dynamic.loading.ClassLoadingStrategy;
+import net.bytebuddy.dynamic.scaffold.InstrumentedType;
+import net.bytebuddy.implementation.FixedValue;
+import net.bytebuddy.implementation.Implementation;
+import net.bytebuddy.implementation.bytecode.ByteCodeAppender;
+import net.bytebuddy.implementation.bytecode.Duplication;
+import net.bytebuddy.implementation.bytecode.StackManipulation;
+import net.bytebuddy.implementation.bytecode.StackManipulation.Compound;
+import net.bytebuddy.implementation.bytecode.TypeCreation;
+import net.bytebuddy.implementation.bytecode.assign.TypeCasting;
+import net.bytebuddy.implementation.bytecode.collection.ArrayFactory;
+import net.bytebuddy.implementation.bytecode.member.FieldAccess;
+import net.bytebuddy.implementation.bytecode.member.MethodInvocation;
+import net.bytebuddy.implementation.bytecode.member.MethodReturn;
+import net.bytebuddy.implementation.bytecode.member.MethodVariableAccess;
+import net.bytebuddy.matcher.ElementMatchers;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.Schema.TypeName;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.sdk.values.reflect.FieldValueGetter;
+
+/**
+ * A utility for automatically generating a {@link Coder} for {@link Row} objects corresponding
to
+ * a specific schema. The resulting coder is loaded into the default ClassLoader and returned.
+ *
+ */
+public abstract class GeneratedRowCoder {
+  private static final ByteBuddy BYTE_BUDDY = new ByteBuddy();
+  private static final ForLoadedType CODER_TYPE = new ForLoadedType(Coder.class);
+  private static final ForLoadedType LIST_CODER_TYPE = new ForLoadedType(ListCoder.class);
+  private static final ForLoadedType MAP_CODER_TYPE = new ForLoadedType(MapCoder.class);
+  private static final BitSetCoder NULL_LIST_CODER = BitSetCoder.of();
+
+  public static final String CODERS_FIELD_NAME = "FIELD_CODERS";
+
+  // A map of primitive types -> StackManipulations to create their coders.
+  private static final Map<TypeName, StackManipulation> CODER_MAP;
+
+  // Cache for Coder class that are already generated.
+  private static Map<UUID, Coder<Row>> generatedCoders = Maps.newHashMap();
+
+  static {
+    // Initialize the CODER_MAP with the StackManipulations to create the primitive coders.
+    // Assumes that each class contains a static of() constructor method.
+    CODER_MAP = Maps.newHashMap();
+    for (Map.Entry<TypeName, Coder> entry : RowCoder.CODER_MAP.entrySet()) {
+      StackManipulation stackManipulation = MethodInvocation.invoke(
+          new ForLoadedType(entry.getValue().getClass()).getDeclaredMethods()
+              .filter(ElementMatchers.named(("of")))
+              .getOnly());
+      CODER_MAP.putIfAbsent(entry.getKey(), stackManipulation);
+    }
+  }
+
+  public static Coder<Row> of(Schema schema, UUID coderId) {
+    return generatedCoders.computeIfAbsent(coderId,
+        h -> {
+          TypeDescription.Generic coderType =
+              TypeDescription.Generic.Builder.parameterizedType(Coder.class, Row.class).build();
+          DynamicType.Builder<Coder> builder =
+              (DynamicType.Builder<Coder>) BYTE_BUDDY.subclass(coderType);
+          builder = createComponentCoders(schema, builder);
+          builder = implementMethods(builder, schema);
+          try {
+            return builder
+                .make()
+                .load(
+                    Coder.class.getClassLoader(),
+                    ClassLoadingStrategy.Default.INJECTION)
+                .getLoaded()
+                .getDeclaredConstructor().newInstance();
+          } catch (InstantiationException | IllegalAccessException
+              | NoSuchMethodException | InvocationTargetException e) {
+            throw new RuntimeException("Unable to generate coder for schema " + schema);
+          }
+        });
+  }
+
+  static DynamicType.Builder<Coder> implementMethods(
+      DynamicType.Builder<Coder> builder, Schema schema) {
+    return builder
+        .defineMethod("getSchema", Schema.class, Visibility.PRIVATE, Ownership.STATIC)
 
 Review comment:
   As we don't have easy way to track down where this method is used, we can add a comment
explaining schema is used for decoding rows.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 115694)

> Improve performance of SchemaCoder
> ----------------------------------
>
>                 Key: BEAM-4613
>                 URL: https://issues.apache.org/jira/browse/BEAM-4613
>             Project: Beam
>          Issue Type: Sub-task
>          Components: sdk-java-core
>            Reporter: Reuven Lax
>            Assignee: Reuven Lax
>            Priority: Major
>          Time Spent: 40m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Mime
View raw message