Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 7BAD4200BC3 for ; Fri, 18 Nov 2016 09:27:39 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id 7A1FC160B04; Fri, 18 Nov 2016 08:27:39 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 73EE8160AFE for ; Fri, 18 Nov 2016 09:27:38 +0100 (CET) Received: (qmail 94638 invoked by uid 500); 18 Nov 2016 08:27:37 -0000 Mailing-List: contact issues-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.apache.org Delivered-To: mailing list issues@flink.apache.org Received: (qmail 94629 invoked by uid 99); 18 Nov 2016 08:27:37 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd4-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 18 Nov 2016 08:27:37 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd4-us-west.apache.org (ASF Mail Server at spamd4-us-west.apache.org) with ESMTP id 2ED80C0D64 for ; Fri, 18 Nov 2016 08:27:37 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd4-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -6.219 X-Spam-Level: X-Spam-Status: No, score=-6.219 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, KAM_LAZY_DOMAIN_SECURITY=1, RCVD_IN_DNSWL_HI=-5, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RP_MATCHES_RCVD=-2.999] autolearn=disabled Received: from mx1-lw-eu.apache.org ([10.40.0.8]) by localhost (spamd4-us-west.apache.org [10.40.0.11]) (amavisd-new, port 10024) with ESMTP id BlExNt1q_WDw for ; Fri, 18 Nov 2016 08:27:34 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx1-lw-eu.apache.org (ASF Mail Server at mx1-lw-eu.apache.org) with SMTP id 3F8635F1E5 for ; Fri, 18 Nov 2016 08:27:33 +0000 (UTC) Received: (qmail 94274 invoked by uid 99); 18 Nov 2016 08:27:32 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 18 Nov 2016 08:27:32 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 37F8DE0230; Fri, 18 Nov 2016 08:27:32 +0000 (UTC) From: mbalassi To: issues@flink.incubator.apache.org Reply-To: issues@flink.incubator.apache.org References: In-Reply-To: Subject: [GitHub] flink pull request #2094: [FLINK-3702] Make FieldAccessors support nested fi... Content-Type: text/plain Message-Id: <20161118082732.37F8DE0230@git1-us-west.apache.org> Date: Fri, 18 Nov 2016 08:27:32 +0000 (UTC) archived-at: Fri, 18 Nov 2016 08:27:39 -0000 Github user mbalassi commented on a diff in the pull request: https://github.com/apache/flink/pull/2094#discussion_r88615630 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/util/typeutils/FieldAccessor.java --- @@ -0,0 +1,386 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.util.typeutils; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.operators.Keys; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeutils.CompositeType; +import org.apache.flink.api.java.tuple.Tuple; +import org.apache.flink.api.java.typeutils.TupleTypeInfo; +import org.apache.flink.api.java.typeutils.TupleTypeInfoBase; +import org.apache.flink.api.java.typeutils.runtime.FieldSerializer; +import org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase; +import scala.Product; + +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.io.Serializable; +import java.lang.reflect.Array; +import java.lang.reflect.Field; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +import static org.apache.flink.util.Preconditions.checkNotNull; + + +/** + * These classes encapsulate the logic of accessing a field specified by the user as either an index + * or a field expression string. TypeInformation can also be requested for the field. + * The position index might specify a field of a Tuple, an array, or a simple type (only "0th field"). + * + * Field expressions that specify nested fields (e.g. "f1.a.foo") result in nested field accessors. + * These penetrate one layer, and then delegate the rest of the work to an "innerAccesor". + * (see PojoFieldAccessor, RecursiveTupleFieldAccessor, ProductFieldAccessor) + */ +@PublicEvolving +public abstract class FieldAccessor implements Serializable { + + private static final long serialVersionUID = 1L; + + protected TypeInformation fieldType; + + /** + * Gets the TypeInformation for the type of the field. + * Note: For an array of a primitive type, it returns the corresponding basic type (Integer for int[]). + */ + @SuppressWarnings("unchecked") + public TypeInformation getFieldType() { + return fieldType; + } + + + /** + * Gets the value of the field (specified in the constructor) of the given record. + * @param record The record on which the field will be accessed + * @return The value of the field. + */ + public abstract F get(T record); + + /** + * Sets the field (specified in the constructor) of the given record to the given value. + * + * Warning: This might modify the original object, or might return a new object instance. + * (This is necessary, because the record might be immutable.) + * + * @param record The record to modify + * @param fieldValue The new value of the field + * @return A record that has the given field value. (this might be a new instance or the original) + */ + public abstract T set(T record, F fieldValue); + + + // -------------------------------------------------------------------------------------------------- + + + /** + * This is when the entire record is considered as a single field. (eg. field 0 of a basic type, or a + * field of a POJO that is itself some composite type but is not further decomposed) + */ + final static class SimpleFieldAccessor extends FieldAccessor { + + private static final long serialVersionUID = 1L; + + public SimpleFieldAccessor(TypeInformation typeInfo) { + checkNotNull(typeInfo, "typeInfo must not be null."); + + this.fieldType = typeInfo; + } + + @Override + public T get(T record) { + return record; + } + + @Override + public T set(T record, T fieldValue) { + return fieldValue; + } + } + + final static class ArrayFieldAccessor extends FieldAccessor { + + private static final long serialVersionUID = 1L; + + private final int pos; + + public ArrayFieldAccessor(int pos, TypeInformation typeInfo) { + if(pos < 0) { + throw new CompositeType.InvalidFieldReferenceException("The " + ((Integer) pos).toString() + ". field selected on" + + " an array, which is an invalid index."); + } + checkNotNull(typeInfo, "typeInfo must not be null."); + + this.pos = pos; + this.fieldType = BasicTypeInfo.getInfoFor(typeInfo.getTypeClass().getComponentType()); + } + + @SuppressWarnings("unchecked") + @Override + public F get(T record) { + return (F) Array.get(record, pos); + } + + @Override + public T set(T record, F fieldValue) { + Array.set(record, pos, fieldValue); + return record; + } + } + + /** + * There are two versions of TupleFieldAccessor, differing in whether there is an other + * FieldAccessor nested inside. The no inner accessor version is probably a little faster. + */ + static final class SimpleTupleFieldAccessor extends FieldAccessor { + + private static final long serialVersionUID = 1L; + + private final int pos; + + SimpleTupleFieldAccessor(int pos, TypeInformation typeInfo) { + checkNotNull(typeInfo, "typeInfo must not be null."); + int arity = ((TupleTypeInfo)typeInfo).getArity(); + if(pos < 0 || pos >= arity) { + throw new CompositeType.InvalidFieldReferenceException( + "Tried to select " + ((Integer) pos).toString() + ". field on \"" + + typeInfo.toString() + "\", which is an invalid index."); + } + + this.pos = pos; + this.fieldType = ((TupleTypeInfo)typeInfo).getTypeAt(pos); + } + + @SuppressWarnings("unchecked") + @Override + public F get(T record) { + final Tuple tuple = (Tuple) record; + return (F) tuple.getField(pos); + } + + @Override + public T set(T record, F fieldValue) { + final Tuple tuple = (Tuple) record; + tuple.setField(fieldValue, pos); + return record; + } + } + + /** + * @param The Tuple type + * @param The field type at the first level + * @param The field type at the innermost level + */ + static final class RecursiveTupleFieldAccessor extends FieldAccessor { + + private static final long serialVersionUID = 1L; + + private final int pos; + private final FieldAccessor innerAccessor; + + RecursiveTupleFieldAccessor(int pos, FieldAccessor innerAccessor, TypeInformation typeInfo) { + checkNotNull(typeInfo, "typeInfo must not be null."); + checkNotNull(innerAccessor, "innerAccessor must not be null."); + + int arity = ((TupleTypeInfo)typeInfo).getArity(); + if(pos < 0 || pos >= arity) { + throw new CompositeType.InvalidFieldReferenceException( + "Tried to select " + ((Integer) pos).toString() + ". field on \"" + + typeInfo.toString() + "\", which is an invalid index."); + } + + if(pos < 0) { + throw new CompositeType.InvalidFieldReferenceException("Tried to select " + ((Integer) pos).toString() + ". field."); + } + + this.pos = pos; + this.innerAccessor = innerAccessor; + this.fieldType = innerAccessor.fieldType; + } + + @Override + public F get(T record) { + final Tuple tuple = (Tuple) record; + final R inner = tuple.getField(pos); + return innerAccessor.get(inner); + } + + @Override + public T set(T record, F fieldValue) { + final Tuple tuple = (Tuple) record; + final R inner = tuple.getField(pos); + tuple.setField(innerAccessor.set(inner, fieldValue), pos); + return record; + } + } + + /** + * @param The POJO type + * @param The field type at the first level + * @param The field type at the innermost level + */ + static final class PojoFieldAccessor extends FieldAccessor { + + private static final long serialVersionUID = 1L; + + private transient Field field; + private final FieldAccessor innerAccessor; + + PojoFieldAccessor(Field field, FieldAccessor innerAccessor) { + checkNotNull(field, "field must not be null."); + checkNotNull(innerAccessor, "innerAccessor must not be null."); + + this.field = field; + this.innerAccessor = innerAccessor; + this.fieldType = innerAccessor.fieldType; + } + + @Override + public F get(T pojo) { + try { + @SuppressWarnings("unchecked") + final R inner = (R)field.get(pojo); + return innerAccessor.get(inner); + } catch (IllegalAccessException iaex) { + // The Field class is transient and when deserializing its value we also make it accessible + throw new RuntimeException("This should not happen since we call setAccesssible(true) in readObject." + + " fields: " + field + " obj: " + pojo); + } + } + + @Override + public T set(T pojo, F valueToSet) { + try { + @SuppressWarnings("unchecked") + final R inner = (R)field.get(pojo); + field.set(pojo, innerAccessor.set(inner, valueToSet)); + return pojo; + } catch (IllegalAccessException iaex) { + // The Field class is transient and when deserializing its value we also make it accessible + throw new RuntimeException("This should not happen since we call setAccesssible(true) in readObject." + + " fields: " + field + " obj: " + pojo); + } + } + + private void writeObject(ObjectOutputStream out) + throws IOException, ClassNotFoundException { + out.defaultWriteObject(); + FieldSerializer.serializeField(field, out); + } + + private void readObject(ObjectInputStream in) + throws IOException, ClassNotFoundException { + in.defaultReadObject(); + field = FieldSerializer.deserializeField(in); + } + } + + static final class ProductFieldAccessor extends FieldAccessor { + + private static final long serialVersionUID = 1L; + + private final int pos; + private final TupleSerializerBase serializer; + private final Object[] fields; + private final int length; + private final FieldAccessor innerAccessor; + + ProductFieldAccessor(int pos, TypeInformation typeInfo, FieldAccessor innerAccessor, ExecutionConfig config) { + int arity = ((TupleTypeInfoBase)typeInfo).getArity(); + if(pos < 0 || pos >= arity) { + throw new CompositeType.InvalidFieldReferenceException( + "Tried to select " + ((Integer) pos).toString() + ". field on \"" + + typeInfo.toString() + "\", which is an invalid index."); + } + checkNotNull(typeInfo, "typeInfo must not be null."); + checkNotNull(innerAccessor, "innerAccessor must not be null."); + + this.pos = pos; + this.fieldType = ((TupleTypeInfoBase)typeInfo).getTypeAt(pos); + this.serializer = (TupleSerializerBase)typeInfo.createSerializer(config); + this.length = this.serializer.getArity(); + this.fields = new Object[this.length]; + this.innerAccessor = innerAccessor; + } + + @SuppressWarnings("unchecked") + @Override + public F get(T record) { + return innerAccessor.get((R)((Product)record).productElement(pos)); + } + + @SuppressWarnings("unchecked") + @Override + public T set(T record, F fieldValue) { + Product prod = (Product)record; + for (int i = 0; i < length; i++) { + fields[i] = prod.productElement(i); + } + fields[pos] = innerAccessor.set((R)fields[pos], fieldValue); + return serializer.createInstance(fields); + } + } + + + // -------------------------------------------------------------------------------------------------- --- End diff -- Good idea. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastructure@apache.org or file a JIRA ticket with INFRA. ---