Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id F1E77200BCB for ; Thu, 24 Nov 2016 22:23:17 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id F0639160AFB; Thu, 24 Nov 2016 21:23:17 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 81F38160B1E for ; Thu, 24 Nov 2016 22:23:15 +0100 (CET) Received: (qmail 49318 invoked by uid 500); 24 Nov 2016 21:23:14 -0000 Mailing-List: contact commits-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.apache.org Delivered-To: mailing list commits@flink.apache.org Received: (qmail 49303 invoked by uid 99); 24 Nov 2016 21:23:14 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 24 Nov 2016 21:23:14 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 7F309E0AF6; Thu, 24 Nov 2016 21:23:14 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: mbalassi@apache.org To: commits@flink.apache.org Date: Thu, 24 Nov 2016 21:23:15 -0000 Message-Id: <725e15c285d14d4983bf36736aec0361@git.apache.org> In-Reply-To: <12ee03d29f1d4ce78c8abed3a37adcf1@git.apache.org> References: <12ee03d29f1d4ce78c8abed3a37adcf1@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [2/4] flink git commit: [FLINK-3702] Make FieldAccessors support nested field expressions. archived-at: Thu, 24 Nov 2016 21:23:18 -0000 [FLINK-3702] Make FieldAccessors support nested field expressions. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/1f04542e Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/1f04542e Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/1f04542e Branch: refs/heads/master Commit: 1f04542e861f9c156d7b5c1f6db72a74e08d7a75 Parents: 5d2da12 Author: Gabor Gevay Authored: Sun May 22 19:48:50 2016 +0200 Committer: Marton Balassi Committed: Thu Nov 24 22:22:42 2016 +0100 ---------------------------------------------------------------------- docs/dev/api_concepts.md | 4 +- .../api/common/typeinfo/BasicArrayTypeInfo.java | 18 + .../api/common/typeinfo/BasicTypeInfo.java | 26 ++ .../InvalidFieldReferenceException.java | 31 ++ .../common/typeinfo/PrimitiveArrayTypeInfo.java | 18 + .../api/common/typeinfo/TypeInformation.java | 34 ++ .../api/common/typeutils/CompositeType.java | 10 - .../api/common/typeutils/TypeSerializer.java | 2 +- .../flink/api/java/typeutils/FieldAccessor.java | 324 ++++++++++++++++++ .../flink/api/java/typeutils/PojoField.java | 22 +- .../flink/api/java/typeutils/PojoTypeInfo.java | 42 ++- .../api/java/typeutils/TupleTypeInfoBase.java | 32 +- .../java/typeutils/runtime/FieldSerializer.java | 54 +++ .../java/typeutils/runtime/PojoComparator.java | 21 +- .../java/typeutils/runtime/PojoSerializer.java | 21 +- .../api/java/typeutils/FieldAccessorTest.java | 343 +++++++++++++++++++ .../api/java/functions/SemanticPropUtil.java | 2 +- .../flink/api/java/operator/DataSinkTest.java | 5 +- .../operator/FullOuterJoinOperatorTest.java | 3 +- .../operator/LeftOuterJoinOperatorTest.java | 3 +- .../operator/RightOuterJoinOperatorTest.java | 3 +- .../scala/typeutils/ProductFieldAccessor.java | 75 ++++ .../api/scala/typeutils/CaseClassTypeInfo.scala | 38 +- .../scala/typeutils/CaseClassTypeInfoTest.scala | 110 ++++++ .../streaming/api/datastream/KeyedStream.java | 140 +++++--- .../aggregation/ComparableAggregator.java | 6 +- .../functions/aggregation/SumAggregator.java | 6 +- .../flink/streaming/util/FieldAccessor.java | 254 -------------- .../flink/streaming/util/FieldAccessorTest.java | 75 ---- .../flink/streaming/api/scala/KeyedStream.scala | 90 ++++- .../streaming/runtime/DataStreamPojoITCase.java | 42 ++- 31 files changed, 1366 insertions(+), 488 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/1f04542e/docs/dev/api_concepts.md ---------------------------------------------------------------------- diff --git a/docs/dev/api_concepts.md b/docs/dev/api_concepts.md index 49d2ded..07a81e7 100644 --- a/docs/dev/api_concepts.md +++ b/docs/dev/api_concepts.md @@ -385,7 +385,7 @@ while a key can be specified on a DataStream using {% highlight java %} DataStream<...> input = // [...] DataStream<...> windowed = input - .key(/*define key here*/) + .keyBy(/*define key here*/) .window(/*window specification*/); {% endhighlight %} @@ -418,7 +418,7 @@ val keyed = input.keyBy(0) -The tuples is grouped on the first field (the one of +The tuples are grouped on the first field (the one of Integer type).
http://git-wip-us.apache.org/repos/asf/flink/blob/1f04542e/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/BasicArrayTypeInfo.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/BasicArrayTypeInfo.java b/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/BasicArrayTypeInfo.java index 25b2850..d04e7d9 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/BasicArrayTypeInfo.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/BasicArrayTypeInfo.java @@ -29,6 +29,7 @@ import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.common.typeutils.base.array.StringArraySerializer; import org.apache.flink.api.common.functions.InvalidTypesException; import org.apache.flink.api.common.typeutils.base.GenericArraySerializer; +import org.apache.flink.api.java.typeutils.FieldAccessor; import static org.apache.flink.util.Preconditions.checkNotNull; @@ -121,6 +122,23 @@ public final class BasicArrayTypeInfo extends TypeInformation { } @Override + @PublicEvolving + public FieldAccessor getFieldAccessor(int pos, ExecutionConfig config) { + return new FieldAccessor.ArrayFieldAccessor<>(pos, this); + } + + @Override + @PublicEvolving + public FieldAccessor getFieldAccessor(String pos, ExecutionConfig config) { + try { + return new FieldAccessor.ArrayFieldAccessor<>(Integer.parseInt(pos), this); + } catch (NumberFormatException ex) { + throw new InvalidFieldReferenceException + ("A field expression on an array must be an integer index (that might be given as a string)."); + } + } + + @Override public boolean equals(Object obj) { if (obj instanceof BasicArrayTypeInfo) { BasicArrayTypeInfo other = (BasicArrayTypeInfo) obj; http://git-wip-us.apache.org/repos/asf/flink/blob/1f04542e/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/BasicTypeInfo.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/BasicTypeInfo.java b/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/BasicTypeInfo.java index e2fd74e..09efba6 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/BasicTypeInfo.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/BasicTypeInfo.java @@ -58,6 +58,7 @@ import org.apache.flink.api.common.typeutils.base.ShortSerializer; import org.apache.flink.api.common.typeutils.base.StringComparator; import org.apache.flink.api.common.typeutils.base.StringSerializer; import org.apache.flink.api.common.typeutils.base.VoidSerializer; +import org.apache.flink.api.java.typeutils.FieldAccessor; import static org.apache.flink.util.Preconditions.checkNotNull; @@ -171,6 +172,31 @@ public class BasicTypeInfo extends TypeInformation implements AtomicType FieldAccessor getFieldAccessor(int pos, ExecutionConfig config) { + if(pos != 0) { + throw new InvalidFieldReferenceException("The " + ((Integer) pos).toString() + ". field selected on a " + + "basic type (" + this.toString() + "). A field expression on a basic type can only select " + + "the 0th field (which means selecting the entire basic type)."); + } + return (FieldAccessor) new FieldAccessor.SimpleFieldAccessor(this); + } + + @Override + @PublicEvolving + public FieldAccessor getFieldAccessor(String field, ExecutionConfig config) { + try { + int pos = field.equals("*") ? 0 : Integer.parseInt(field); + return getFieldAccessor(pos, config); + } catch (NumberFormatException ex) { + throw new InvalidFieldReferenceException("You tried to select the field \"" + field + + "\" on a " + this.toString() + ". A field expression on a basic type can only be \"*\" or \"0\"" + + " (both of which mean selecting the entire basic type)."); + } + } + // -------------------------------------------------------------------------------------------- @Override http://git-wip-us.apache.org/repos/asf/flink/blob/1f04542e/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/InvalidFieldReferenceException.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/InvalidFieldReferenceException.java b/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/InvalidFieldReferenceException.java new file mode 100644 index 0000000..3c67c46 --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/InvalidFieldReferenceException.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.common.typeinfo; + +import org.apache.flink.annotation.PublicEvolving; + +@PublicEvolving +public class InvalidFieldReferenceException extends IllegalArgumentException { + + private static final long serialVersionUID = 1L; + + public InvalidFieldReferenceException(String s) { + super(s); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/1f04542e/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/PrimitiveArrayTypeInfo.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/PrimitiveArrayTypeInfo.java b/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/PrimitiveArrayTypeInfo.java index 1c6ce00..2bd96d3 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/PrimitiveArrayTypeInfo.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/PrimitiveArrayTypeInfo.java @@ -40,6 +40,7 @@ import org.apache.flink.api.common.typeutils.base.array.LongPrimitiveArraySerial import org.apache.flink.api.common.typeutils.base.array.PrimitiveArrayComparator; import org.apache.flink.api.common.typeutils.base.array.ShortPrimitiveArrayComparator; import org.apache.flink.api.common.typeutils.base.array.ShortPrimitiveArraySerializer; +import org.apache.flink.api.java.typeutils.FieldAccessor; import java.util.HashMap; import java.util.Map; @@ -138,6 +139,23 @@ public class PrimitiveArrayTypeInfo extends TypeInformation implements Ato return this.serializer; } + @Override + @PublicEvolving + public FieldAccessor getFieldAccessor(int pos, ExecutionConfig config) { + return new FieldAccessor.ArrayFieldAccessor<>(pos, this); + } + + @Override + @PublicEvolving + public FieldAccessor getFieldAccessor(String pos, ExecutionConfig config) { + try { + return new FieldAccessor.ArrayFieldAccessor<>(Integer.parseInt(pos), this); + } catch (NumberFormatException ex) { + throw new InvalidFieldReferenceException + ("A field expression on an array must be an integer index (that might be given as a string)."); + } + } + /** * Gets the class that represents the component type. * @return The class of the component type. http://git-wip-us.apache.org/repos/asf/flink/blob/1f04542e/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/TypeInformation.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/TypeInformation.java b/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/TypeInformation.java index 154ceb1..7be2b68 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/TypeInformation.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/TypeInformation.java @@ -24,6 +24,7 @@ import org.apache.flink.annotation.Public; import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.typeutils.FieldAccessor; import org.apache.flink.api.java.typeutils.TypeExtractor; import java.io.Serializable; @@ -172,6 +173,39 @@ public abstract class TypeInformation implements Serializable { @PublicEvolving public abstract TypeSerializer createSerializer(ExecutionConfig config); + + /** + * Creates a {@link FieldAccessor} for the given field position, which can be used to get and set + * the specified field on instances of this type. + * + * @param pos The field position (zero-based) + * @param config Configuration object + * @param The type of the field to access + * @return The created FieldAccessor + */ + @PublicEvolving + public FieldAccessor getFieldAccessor(int pos, ExecutionConfig config){ + throw new InvalidFieldReferenceException("Cannot reference field by position on " + this.toString() + + "Referencing a field by position is supported on tuples, case classes, and arrays. " + + "Additionally, you can select the 0th field of a primitive/basic type (e.g. int)."); + } + + /** + * Creates a {@link FieldAccessor} for the field that is given by a field expression, + * which can be used to get and set the specified field on instances of this type. + * + * @param field The field expression + * @param config Configuration object + * @param The type of the field to access + * @return The created FieldAccessor + */ + @PublicEvolving + public FieldAccessor getFieldAccessor(String field, ExecutionConfig config) { + throw new InvalidFieldReferenceException("Cannot reference field by field expression on " + this.toString() + + "Field expressions are only supported on POJO types, tuples, and case classes. " + + "(See the Flink documentation on what is considered a POJO.)"); + } + @Override public abstract String toString(); http://git-wip-us.apache.org/repos/asf/flink/blob/1f04542e/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeType.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeType.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeType.java index 4bf17ea..a4230f4 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeType.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeType.java @@ -265,16 +265,6 @@ public abstract class CompositeType extends TypeInformation { @PublicEvolving public abstract int getFieldIndex(String fieldName); - @PublicEvolving - public static class InvalidFieldReferenceException extends IllegalArgumentException { - - private static final long serialVersionUID = 1L; - - public InvalidFieldReferenceException(String s) { - super(s); - } - } - @Override public boolean equals(Object obj) { if (obj instanceof CompositeType) { http://git-wip-us.apache.org/repos/asf/flink/blob/1f04542e/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializer.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializer.java index 0d56743..5e81db7 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializer.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializer.java @@ -26,7 +26,7 @@ import org.apache.flink.core.memory.DataInputView; import org.apache.flink.core.memory.DataOutputView; /** - * This interface describes the methods that are required for a data type to be handled by the pact + * This interface describes the methods that are required for a data type to be handled by the Flink * runtime. Specifically, this interface contains the serialization and copying methods. *

* The methods in this class are assumed to be stateless, such that it is effectively thread safe. Stateful http://git-wip-us.apache.org/repos/asf/flink/blob/1f04542e/flink-core/src/main/java/org/apache/flink/api/java/typeutils/FieldAccessor.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/FieldAccessor.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/FieldAccessor.java new file mode 100644 index 0000000..97ef31a --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/FieldAccessor.java @@ -0,0 +1,324 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.typeutils; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.common.operators.Keys; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeinfo.InvalidFieldReferenceException; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.tuple.Tuple; +import org.apache.flink.api.java.typeutils.runtime.FieldSerializer; + +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.io.Serializable; +import java.lang.reflect.Array; +import java.lang.reflect.Field; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +import static org.apache.flink.util.Preconditions.checkNotNull; + + +/** + * These classes encapsulate the logic of accessing a field specified by the user as either an index + * or a field expression string. TypeInformation can also be requested for the field. + * The position index might specify a field of a Tuple, an array, or a simple type (only "0th field"). + * + * Field expressions that specify nested fields (e.g. "f1.a.foo") result in nested field accessors. + * These penetrate one layer, and then delegate the rest of the work to an "innerAccesor". + * (see PojoFieldAccessor, RecursiveTupleFieldAccessor, ProductFieldAccessor) + */ +@PublicEvolving +public abstract class FieldAccessor implements Serializable { + + private static final long serialVersionUID = 1L; + + protected TypeInformation fieldType; + + /** + * Gets the TypeInformation for the type of the field. + * Note: For an array of a primitive type, it returns the corresponding basic type (Integer for int[]). + */ + @SuppressWarnings("unchecked") + public TypeInformation getFieldType() { + return fieldType; + } + + + /** + * Gets the value of the field (specified in the constructor) of the given record. + * @param record The record on which the field will be accessed + * @return The value of the field. + */ + public abstract F get(T record); + + /** + * Sets the field (specified in the constructor) of the given record to the given value. + * + * Warning: This might modify the original object, or might return a new object instance. + * (This is necessary, because the record might be immutable.) + * + * @param record The record to modify + * @param fieldValue The new value of the field + * @return A record that has the given field value. (this might be a new instance or the original) + */ + public abstract T set(T record, F fieldValue); + + + // -------------------------------------------------------------------------------------------------- + + + /** + * This is when the entire record is considered as a single field. (eg. field 0 of a basic type, or a + * field of a POJO that is itself some composite type but is not further decomposed) + */ + public final static class SimpleFieldAccessor extends FieldAccessor { + + private static final long serialVersionUID = 1L; + + public SimpleFieldAccessor(TypeInformation typeInfo) { + checkNotNull(typeInfo, "typeInfo must not be null."); + + this.fieldType = typeInfo; + } + + @Override + public T get(T record) { + return record; + } + + @Override + public T set(T record, T fieldValue) { + return fieldValue; + } + } + + public final static class ArrayFieldAccessor extends FieldAccessor { + + private static final long serialVersionUID = 1L; + + private final int pos; + + public ArrayFieldAccessor(int pos, TypeInformation typeInfo) { + if(pos < 0) { + throw new InvalidFieldReferenceException("The " + ((Integer) pos).toString() + ". field selected on" + + " an array, which is an invalid index."); + } + checkNotNull(typeInfo, "typeInfo must not be null."); + + this.pos = pos; + this.fieldType = BasicTypeInfo.getInfoFor(typeInfo.getTypeClass().getComponentType()); + } + + @SuppressWarnings("unchecked") + @Override + public F get(T record) { + return (F) Array.get(record, pos); + } + + @Override + public T set(T record, F fieldValue) { + Array.set(record, pos, fieldValue); + return record; + } + } + + /** + * There are two versions of TupleFieldAccessor, differing in whether there is an other + * FieldAccessor nested inside. The no inner accessor version is probably a little faster. + */ + static final class SimpleTupleFieldAccessor extends FieldAccessor { + + private static final long serialVersionUID = 1L; + + private final int pos; + + SimpleTupleFieldAccessor(int pos, TypeInformation typeInfo) { + int arity = ((TupleTypeInfo)typeInfo).getArity(); + if(pos < 0 || pos >= arity) { + throw new InvalidFieldReferenceException( + "Tried to select " + ((Integer) pos).toString() + ". field on \"" + + typeInfo.toString() + "\", which is an invalid index."); + } + checkNotNull(typeInfo, "typeInfo must not be null."); + + this.pos = pos; + this.fieldType = ((TupleTypeInfo)typeInfo).getTypeAt(pos); + } + + @SuppressWarnings("unchecked") + @Override + public F get(T record) { + final Tuple tuple = (Tuple) record; + return (F) tuple.getField(pos); + } + + @Override + public T set(T record, F fieldValue) { + final Tuple tuple = (Tuple) record; + tuple.setField(fieldValue, pos); + return record; + } + } + + /** + * @param The Tuple type + * @param The field type at the first level + * @param The field type at the innermost level + */ + static final class RecursiveTupleFieldAccessor extends FieldAccessor { + + private static final long serialVersionUID = 1L; + + private final int pos; + private final FieldAccessor innerAccessor; + + RecursiveTupleFieldAccessor(int pos, FieldAccessor innerAccessor) { + if(pos < 0) { + throw new InvalidFieldReferenceException("Tried to select " + ((Integer) pos).toString() + ". field."); + } + checkNotNull(innerAccessor, "innerAccessor must not be null."); + + this.pos = pos; + this.innerAccessor = innerAccessor; + this.fieldType = innerAccessor.fieldType; + } + + @Override + public F get(T record) { + final Tuple tuple = (Tuple) record; + final R inner = tuple.getField(pos); + return innerAccessor.get(inner); + } + + @Override + public T set(T record, F fieldValue) { + final Tuple tuple = (Tuple) record; + final R inner = tuple.getField(pos); + tuple.setField(innerAccessor.set(inner, fieldValue), pos); + return record; + } + } + + /** + * @param The POJO type + * @param The field type at the first level + * @param The field type at the innermost level + */ + static final class PojoFieldAccessor extends FieldAccessor { + + private static final long serialVersionUID = 1L; + + private transient Field field; + private final FieldAccessor innerAccessor; + + PojoFieldAccessor(Field field, FieldAccessor innerAccessor) { + checkNotNull(field, "field must not be null."); + checkNotNull(innerAccessor, "innerAccessor must not be null."); + + this.field = field; + this.innerAccessor = innerAccessor; + this.fieldType = innerAccessor.fieldType; + } + + @Override + public F get(T pojo) { + try { + @SuppressWarnings("unchecked") + final R inner = (R)field.get(pojo); + return innerAccessor.get(inner); + } catch (IllegalAccessException iaex) { + throw new RuntimeException("This should not happen since we call setAccesssible(true) in readObject." + + " fields: " + field + " obj: " + pojo); + } + } + + @Override + public T set(T pojo, F valueToSet) { + try { + @SuppressWarnings("unchecked") + final R inner = (R)field.get(pojo); + field.set(pojo, innerAccessor.set(inner, valueToSet)); + return pojo; + } catch (IllegalAccessException iaex) { + throw new RuntimeException("This should not happen since we call setAccesssible(true) in readObject." + + " fields: " + field + " obj: " + pojo); + } + } + + private void writeObject(ObjectOutputStream out) + throws IOException, ClassNotFoundException { + out.defaultWriteObject(); + FieldSerializer.serializeField(field, out); + } + + private void readObject(ObjectInputStream in) + throws IOException, ClassNotFoundException { + in.defaultReadObject(); + field = FieldSerializer.deserializeField(in); + } + } + + + // -------------------------------------------------------------------------------------------------- + + private final static String REGEX_FIELD = "[\\p{L}\\p{Digit}_\\$]*"; // This can start with a digit (because of Tuples) + private final static String REGEX_NESTED_FIELDS = "("+REGEX_FIELD+")(\\.(.+))?"; + private final static String REGEX_NESTED_FIELDS_WILDCARD = REGEX_NESTED_FIELDS + +"|\\"+ Keys.ExpressionKeys.SELECT_ALL_CHAR + +"|\\"+ Keys.ExpressionKeys.SELECT_ALL_CHAR_SCALA; + + private static final Pattern PATTERN_NESTED_FIELDS_WILDCARD = Pattern.compile(REGEX_NESTED_FIELDS_WILDCARD); + + public static FieldExpression decomposeFieldExpression(String fieldExpression) { + Matcher matcher = PATTERN_NESTED_FIELDS_WILDCARD.matcher(fieldExpression); + if(!matcher.matches()) { + throw new InvalidFieldReferenceException("Invalid field expression \""+fieldExpression+"\"."); + } + + String head = matcher.group(0); + if(head.equals(Keys.ExpressionKeys.SELECT_ALL_CHAR) || head.equals(Keys.ExpressionKeys.SELECT_ALL_CHAR_SCALA)) { + throw new InvalidFieldReferenceException("No wildcards are allowed here."); + } else { + head = matcher.group(1); + } + + String tail = matcher.group(3); + + return new FieldExpression(head, tail); + } + + /** + * Represents a decomposition of a field expression into its first part, and the rest. + * E.g. "foo.f1.bar" is decomposed into "foo" and "f1.bar". + */ + public static class FieldExpression implements Serializable { + + private static final long serialVersionUID = 1L; + + public String head, tail; // tail can be null, if the field expression had just one part + + FieldExpression(String head, String tail) { + this.head = head; + this.tail = tail; + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/1f04542e/flink-core/src/main/java/org/apache/flink/api/java/typeutils/PojoField.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/PojoField.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/PojoField.java index 026cfa6..2e20415 100644 --- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/PojoField.java +++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/PojoField.java @@ -27,6 +27,7 @@ import java.util.Objects; import org.apache.flink.annotation.Internal; import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.typeutils.runtime.FieldSerializer; import static org.apache.flink.util.Preconditions.checkNotNull; @@ -57,30 +58,13 @@ public class PojoField implements Serializable { private void writeObject(ObjectOutputStream out) throws IOException, ClassNotFoundException { out.defaultWriteObject(); - out.writeObject(field.getDeclaringClass()); - out.writeUTF(field.getName()); + FieldSerializer.serializeField(field, out); } private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException { in.defaultReadObject(); - Class clazz = (Class)in.readObject(); - String fieldName = in.readUTF(); - field = null; - // try superclasses as well - while (clazz != null) { - try { - field = clazz.getDeclaredField(fieldName); - field.setAccessible(true); - break; - } catch (NoSuchFieldException e) { - clazz = clazz.getSuperclass(); - } - } - if (field == null) { - throw new RuntimeException("Class resolved at TaskManager is not compatible with class read during Plan setup." - + " (" + fieldName + ")"); - } + field = FieldSerializer.deserializeField(in); } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/1f04542e/flink-core/src/main/java/org/apache/flink/api/java/typeutils/PojoTypeInfo.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/PojoTypeInfo.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/PojoTypeInfo.java index 9c65263..72432d6 100644 --- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/PojoTypeInfo.java +++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/PojoTypeInfo.java @@ -23,6 +23,7 @@ import org.apache.flink.annotation.Public; import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.operators.Keys.ExpressionKeys; +import org.apache.flink.api.common.typeinfo.InvalidFieldReferenceException; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeutils.CompositeType; import org.apache.flink.api.common.typeutils.TypeComparator; @@ -64,8 +65,8 @@ public class PojoTypeInfo extends CompositeType { private final static String REGEX_FIELD = "[\\p{L}_\\$][\\p{L}\\p{Digit}_\\$]*"; private final static String REGEX_NESTED_FIELDS = "("+REGEX_FIELD+")(\\.(.+))?"; private final static String REGEX_NESTED_FIELDS_WILDCARD = REGEX_NESTED_FIELDS - +"|\\"+ExpressionKeys.SELECT_ALL_CHAR - +"|\\"+ExpressionKeys.SELECT_ALL_CHAR_SCALA; + +"|\\"+ExpressionKeys.SELECT_ALL_CHAR + +"|\\"+ExpressionKeys.SELECT_ALL_CHAR_SCALA; private static final Pattern PATTERN_NESTED_FIELDS = Pattern.compile(REGEX_NESTED_FIELDS); private static final Pattern PATTERN_NESTED_FIELDS_WILDCARD = Pattern.compile(REGEX_NESTED_FIELDS_WILDCARD); @@ -132,7 +133,7 @@ public class PojoTypeInfo extends CompositeType { // gives only some undefined order. return false; } - + @Override @PublicEvolving @@ -264,6 +265,7 @@ public class PojoTypeInfo extends CompositeType { } @Override + @PublicEvolving protected TypeComparatorBuilder createTypeComparatorBuilder() { return new PojoTypeComparatorBuilder(); } @@ -317,7 +319,39 @@ public class PojoTypeInfo extends CompositeType { return new PojoSerializer(getTypeClass(), fieldSerializers, reflectiveFields, config); } - + + @Override + @PublicEvolving + public FieldAccessor getFieldAccessor(String fieldExpression, ExecutionConfig config) { + + FieldAccessor.FieldExpression decomp = FieldAccessor.decomposeFieldExpression(fieldExpression); + + // get field + PojoField field = null; + TypeInformation fieldType = null; + for (int i = 0; i < fields.length; i++) { + if (fields[i].getField().getName().equals(decomp.head)) { + field = fields[i]; + fieldType = fields[i].getTypeInformation(); + break; + } + } + if (field == null) { + throw new InvalidFieldReferenceException("Unable to find field \""+decomp.head+"\" in type "+this+"."); + } + + if(decomp.tail == null) { + @SuppressWarnings("unchecked") + FieldAccessor innerAccessor = new FieldAccessor.SimpleFieldAccessor((TypeInformation) fieldType); + return new FieldAccessor.PojoFieldAccessor(field.getField(), innerAccessor); + } else { + @SuppressWarnings("unchecked") + FieldAccessor innerAccessor = + (FieldAccessor)fieldType.getFieldAccessor(decomp.tail, config); + return new FieldAccessor.PojoFieldAccessor(field.getField(), innerAccessor); + } + } + @Override public boolean equals(Object obj) { if (obj instanceof PojoTypeInfo) { http://git-wip-us.apache.org/repos/asf/flink/blob/1f04542e/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TupleTypeInfoBase.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TupleTypeInfoBase.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TupleTypeInfoBase.java index 807fd54..c9a55fc 100644 --- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TupleTypeInfoBase.java +++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TupleTypeInfoBase.java @@ -23,7 +23,10 @@ import java.util.List; import java.util.regex.Matcher; import java.util.regex.Pattern; +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.operators.Keys.ExpressionKeys; +import org.apache.flink.api.common.typeinfo.InvalidFieldReferenceException; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeutils.CompositeType; @@ -203,7 +206,34 @@ public abstract class TupleTypeInfoBase extends CompositeType { TypeInformation typed = (TypeInformation) this.types[pos]; return typed; } - + + @Override + @PublicEvolving + public FieldAccessor getFieldAccessor(int pos, ExecutionConfig config) { + return new FieldAccessor.SimpleTupleFieldAccessor(pos, this); + } + + @Override + @PublicEvolving + public FieldAccessor getFieldAccessor(String fieldExpression, ExecutionConfig config) { + FieldAccessor.FieldExpression decomp = FieldAccessor.decomposeFieldExpression(fieldExpression); + int fieldPos = this.getFieldIndex(decomp.head); + if (fieldPos == -1) { + try { + fieldPos = Integer.parseInt(decomp.head); + } catch (NumberFormatException ex) { + throw new InvalidFieldReferenceException("Tried to select field \"" + decomp.head + + "\" on " + this.toString()); + } + } + if (decomp.tail == null) { + return new FieldAccessor.SimpleTupleFieldAccessor(fieldPos, this); + } else { + FieldAccessor innerAccessor = getTypeAt(fieldPos).getFieldAccessor(decomp.tail, config); + return new FieldAccessor.RecursiveTupleFieldAccessor<>(fieldPos, innerAccessor); + } + } + @Override public boolean equals(Object obj) { if (obj instanceof TupleTypeInfoBase) { http://git-wip-us.apache.org/repos/asf/flink/blob/1f04542e/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/FieldSerializer.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/FieldSerializer.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/FieldSerializer.java new file mode 100644 index 0000000..057eee9 --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/FieldSerializer.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.typeutils.runtime; + +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.lang.reflect.Field; + +/** + * This class is for the serialization of java.lang.reflect.Field, which doesn't implement Serializable, therefore + * readObject/writeObject need to be implemented in classes where there is a field of type java.lang.reflect.Field. + * The two static methods in this class are to be called from these readObject/writeObject methods. + */ +public class FieldSerializer { + + public static void serializeField(Field field, ObjectOutputStream out) throws IOException { + out.writeObject(field.getDeclaringClass()); + out.writeUTF(field.getName()); + } + + public static Field deserializeField(ObjectInputStream in) throws IOException, ClassNotFoundException { + Class clazz = (Class) in.readObject(); + String fieldName = in.readUTF(); + // try superclasses as well + while (clazz != null) { + try { + Field field = clazz.getDeclaredField(fieldName); + field.setAccessible(true); + return field; + } catch (NoSuchFieldException e) { + clazz = clazz.getSuperclass(); + } + } + throw new RuntimeException("Class resolved at TaskManager is not compatible with class read during Plan setup." + + " (" + fieldName + ")"); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/1f04542e/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoComparator.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoComparator.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoComparator.java index c0c7797..fc4a305 100644 --- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoComparator.java +++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoComparator.java @@ -142,8 +142,7 @@ public final class PojoComparator extends CompositeTypeComparator implemen out.defaultWriteObject(); out.writeInt(keyFields.length); for (Field field: keyFields) { - out.writeObject(field.getDeclaringClass()); - out.writeUTF(field.getName()); + FieldSerializer.serializeField(field, out); } } @@ -153,23 +152,7 @@ public final class PojoComparator extends CompositeTypeComparator implemen int numKeyFields = in.readInt(); keyFields = new Field[numKeyFields]; for (int i = 0; i < numKeyFields; i++) { - Class clazz = (Class) in.readObject(); - String fieldName = in.readUTF(); - // try superclasses as well - while (clazz != null) { - try { - Field field = clazz.getDeclaredField(fieldName); - field.setAccessible(true); - keyFields[i] = field; - break; - } catch (NoSuchFieldException e) { - clazz = clazz.getSuperclass(); - } - } - if (keyFields[i] == null ) { - throw new RuntimeException("Class resolved at TaskManager is not compatible with class read during Plan setup." - + " (" + fieldName + ")"); - } + keyFields[i] = FieldSerializer.deserializeField(in); } } http://git-wip-us.apache.org/repos/asf/flink/blob/1f04542e/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java index 9958540..57928b8 100644 --- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java +++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java @@ -121,8 +121,7 @@ public final class PojoSerializer extends TypeSerializer { out.defaultWriteObject(); out.writeInt(fields.length); for (Field field: fields) { - out.writeObject(field.getDeclaringClass()); - out.writeUTF(field.getName()); + FieldSerializer.serializeField(field, out); } } @@ -132,23 +131,7 @@ public final class PojoSerializer extends TypeSerializer { int numFields = in.readInt(); fields = new Field[numFields]; for (int i = 0; i < numFields; i++) { - Class clazz = (Class)in.readObject(); - String fieldName = in.readUTF(); - fields[i] = null; - // try superclasses as well - while (clazz != null) { - try { - fields[i] = clazz.getDeclaredField(fieldName); - fields[i].setAccessible(true); - break; - } catch (NoSuchFieldException e) { - clazz = clazz.getSuperclass(); - } - } - if (fields[i] == null) { - throw new RuntimeException("Class resolved at TaskManager is not compatible with class read during Plan setup." - + " (" + fieldName + ")"); - } + fields[i] = FieldSerializer.deserializeField(in); } cl = Thread.currentThread().getContextClassLoader(); http://git-wip-us.apache.org/repos/asf/flink/blob/1f04542e/flink-core/src/test/java/org/apache/flink/api/java/typeutils/FieldAccessorTest.java ---------------------------------------------------------------------- diff --git a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/FieldAccessorTest.java b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/FieldAccessorTest.java new file mode 100644 index 0000000..f780447 --- /dev/null +++ b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/FieldAccessorTest.java @@ -0,0 +1,343 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.typeutils; + +import static org.junit.Assert.*; + +import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeinfo.InvalidFieldReferenceException; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.tuple.Tuple3; +import org.junit.Test; + +public class FieldAccessorTest { + + // Note, that AggregationFunctionTest indirectly also tests FieldAccessors. + // ProductFieldAccessor is tested in CaseClassTypeInfoTest. + + @Test + public void testFlatTuple() { + Tuple2 t = Tuple2.of("aa", 5); + TupleTypeInfo> tpeInfo = + (TupleTypeInfo>) TypeExtractor.getForObject(t); + + FieldAccessor, String> f0 = tpeInfo.getFieldAccessor("f0", null); + assertEquals("aa", f0.get(t)); + assertEquals("aa", t.f0); + t = f0.set(t, "b"); + assertEquals("b", f0.get(t)); + assertEquals("b", t.f0); + + FieldAccessor, Integer> f1 = tpeInfo.getFieldAccessor("f1", null); + assertEquals(5, (int) f1.get(t)); + assertEquals(5, (int) t.f1); + t = f1.set(t, 7); + assertEquals(7, (int) f1.get(t)); + assertEquals(7, (int) t.f1); + assertEquals("b", f0.get(t)); + assertEquals("b", t.f0); + + + FieldAccessor, Integer> f1n = tpeInfo.getFieldAccessor(1, null); + assertEquals(7, (int) f1n.get(t)); + assertEquals(7, (int) t.f1); + t = f1n.set(t, 10); + assertEquals(10, (int) f1n.get(t)); + assertEquals(10, (int) f1.get(t)); + assertEquals(10, (int) t.f1); + assertEquals("b", f0.get(t)); + assertEquals("b", t.f0); + + FieldAccessor, Integer> f1ns = tpeInfo.getFieldAccessor("1", null); + assertEquals(10, (int) f1ns.get(t)); + assertEquals(10, (int) t.f1); + t = f1ns.set(t, 11); + assertEquals(11, (int) f1ns.get(t)); + assertEquals(11, (int) f1.get(t)); + assertEquals(11, (int) t.f1); + assertEquals("b", f0.get(t)); + assertEquals("b", t.f0); + + // This is technically valid (the ".0" is selecting the 0th field of a basic type). + FieldAccessor, String> f0_0 = tpeInfo.getFieldAccessor("f0.0", null); + assertEquals("b", f0_0.get(t)); + assertEquals("b", t.f0); + t = f0_0.set(t, "cc"); + assertEquals("cc", f0_0.get(t)); + assertEquals("cc", t.f0); + + try { + FieldAccessor, String> bad = tpeInfo.getFieldAccessor("almafa", null); + assertFalse("Expected exception, because of bad field name", false); + } catch (InvalidFieldReferenceException ex) { + // OK + } + } + + @Test + public void testTupleInTuple() { + Tuple2> t = Tuple2.of("aa", Tuple3.of(5, 9L, 2.0)); + TupleTypeInfo>> tpeInfo = + (TupleTypeInfo>>)TypeExtractor.getForObject(t); + + FieldAccessor>, String> f0 = tpeInfo.getFieldAccessor("f0", null); + assertEquals("aa", f0.get(t)); + assertEquals("aa", t.f0); + + FieldAccessor>, Double> f1f2 = tpeInfo.getFieldAccessor("f1.f2", null); + assertEquals(2.0, f1f2.get(t), 0); + assertEquals(2.0, t.f1.f2, 0); + t = f1f2.set(t, 3.0); + assertEquals(3.0, f1f2.get(t), 0); + assertEquals(3.0, t.f1.f2, 0); + assertEquals("aa", f0.get(t)); + assertEquals("aa", t.f0); + + FieldAccessor>, Tuple3> f1 = tpeInfo.getFieldAccessor("f1", null); + assertEquals(Tuple3.of(5, 9L, 3.0), f1.get(t)); + assertEquals(Tuple3.of(5, 9L, 3.0), t.f1); + t = f1.set(t, Tuple3.of(8, 12L, 4.0)); + assertEquals(Tuple3.of(8, 12L, 4.0), f1.get(t)); + assertEquals(Tuple3.of(8, 12L, 4.0), t.f1); + assertEquals("aa", f0.get(t)); + assertEquals("aa", t.f0); + + FieldAccessor>, Tuple3> f1n = tpeInfo.getFieldAccessor(1, null); + assertEquals(Tuple3.of(8, 12L, 4.0), f1n.get(t)); + assertEquals(Tuple3.of(8, 12L, 4.0), t.f1); + t = f1n.set(t, Tuple3.of(10, 13L, 5.0)); + assertEquals(Tuple3.of(10, 13L, 5.0), f1n.get(t)); + assertEquals(Tuple3.of(10, 13L, 5.0), f1.get(t)); + assertEquals(Tuple3.of(10, 13L, 5.0), t.f1); + assertEquals("aa", f0.get(t)); + assertEquals("aa", t.f0); + } + + @Test + @SuppressWarnings("unchecked") + public void testTupleFieldAccessorOutOfBounds() { + try { + TupleTypeInfo.getBasicTupleTypeInfo(Integer.class, Integer.class).getFieldAccessor(2, null); + fail(); + } catch (InvalidFieldReferenceException e) { + // Nothing to do here + } + } + + public static class Foo { + public int x; + public Tuple2 t; + public Short y; + + public Foo() {} + + public Foo(int x, Tuple2 t, Short y) { + this.x = x; + this.t = t; + this.y = y; + } + } + + @Test + public void testTupleInPojoInTuple() { + Tuple2 t = Tuple2.of("aa", new Foo(8, Tuple2.of("ddd", 9L), (short) 2)); + TupleTypeInfo> tpeInfo = + (TupleTypeInfo>)TypeExtractor.getForObject(t); + + FieldAccessor, Long> f1tf1 = tpeInfo.getFieldAccessor("f1.t.f1", null); + assertEquals(9L, (long) f1tf1.get(t)); + assertEquals(9L, (long) t.f1.t.f1); + t = f1tf1.set(t, 12L); + assertEquals(12L, (long) f1tf1.get(t)); + assertEquals(12L, (long) t.f1.t.f1); + + FieldAccessor, String> f1tf0 = tpeInfo.getFieldAccessor("f1.t.f0", null); + assertEquals("ddd", f1tf0.get(t)); + assertEquals("ddd", t.f1.t.f0); + t = f1tf0.set(t, "alma"); + assertEquals("alma", f1tf0.get(t)); + assertEquals("alma", t.f1.t.f0); + + FieldAccessor, Foo> f1 = tpeInfo.getFieldAccessor("f1", null); + FieldAccessor, Foo> f1n = tpeInfo.getFieldAccessor(1, null); + assertEquals(Tuple2.of("alma", 12L), f1.get(t).t); + assertEquals(Tuple2.of("alma", 12L), f1n.get(t).t); + assertEquals(Tuple2.of("alma", 12L), t.f1.t); + Foo newFoo = new Foo(8, Tuple2.of("ddd", 9L), (short) 2); + f1.set(t, newFoo); + assertEquals(newFoo, f1.get(t)); + assertEquals(newFoo, f1n.get(t)); + assertEquals(newFoo, t.f1); + } + + + public static class Inner { + public long x; + public boolean b; + + public Inner(){} + + public Inner(long x) { + this.x = x; + } + + public Inner(long x, boolean b) { + this.x = x; + this.b = b; + } + + @Override + public String toString() { + return ((Long)x).toString() + ", " + b; + } + } + + public static class Outer { + public int a; + public Inner i; + public short b; + + public Outer(){} + + public Outer(int a, Inner i, short b) { + this.a = a; + this.i = i; + this.b = b; + } + + @Override + public String toString() { + return a+", "+i.toString()+", "+b; + } + } + + @Test + public void testPojoInPojo() { + Outer o = new Outer(10, new Inner(4L), (short)12); + PojoTypeInfo tpeInfo = (PojoTypeInfo)TypeInformation.of(Outer.class); + + FieldAccessor fix = tpeInfo.getFieldAccessor("i.x", null); + assertEquals(4L, (long) fix.get(o)); + assertEquals(4L, o.i.x); + o = fix.set(o, 22L); + assertEquals(22L, (long) fix.get(o)); + assertEquals(22L, o.i.x); + + FieldAccessor fi = tpeInfo.getFieldAccessor("i", null); + assertEquals(22L, fi.get(o).x); + assertEquals(22L, (long) fix.get(o)); + assertEquals(22L, o.i.x); + o = fi.set(o, new Inner(30L)); + assertEquals(30L, fi.get(o).x); + assertEquals(30L, (long) fix.get(o)); + assertEquals(30L, o.i.x); + } + + @Test + @SuppressWarnings("unchecked") + public void testArray() { + int[] a = new int[]{3,5}; + FieldAccessor fieldAccessor = + (FieldAccessor) (Object) + PrimitiveArrayTypeInfo.getInfoFor(a.getClass()).getFieldAccessor(1, null); + + assertEquals(Integer.class, fieldAccessor.getFieldType().getTypeClass()); + + assertEquals((Integer)a[1], fieldAccessor.get(a)); + + a = fieldAccessor.set(a, 6); + assertEquals((Integer)a[1], fieldAccessor.get(a)); + + + + Integer[] b = new Integer[]{3,5}; + FieldAccessor fieldAccessor2 = + (FieldAccessor) (Object) + BasicArrayTypeInfo.getInfoFor(b.getClass()).getFieldAccessor(1, null); + + assertEquals(Integer.class, fieldAccessor2.getFieldType().getTypeClass()); + + assertEquals(b[1], fieldAccessor2.get(b)); + + b = fieldAccessor2.set(b, 6); + assertEquals(b[1], fieldAccessor2.get(b)); + } + + public static class ArrayInPojo { + public long x; + public int[] arr; + public int y; + + public ArrayInPojo() {} + + public ArrayInPojo(long x, int[] arr, int y) { + this.x = x; + this.arr = arr; + this.y = y; + } + } + + @Test + public void testArrayInPojo() { + ArrayInPojo o = new ArrayInPojo(10L, new int[]{3,4,5}, 12); + PojoTypeInfo tpeInfo = (PojoTypeInfo)TypeInformation.of(ArrayInPojo.class); + + FieldAccessor fix = tpeInfo.getFieldAccessor("arr.1", null); + assertEquals(4, (int) fix.get(o)); + assertEquals(4L, o.arr[1]); + o = fix.set(o, 8); + assertEquals(8, (int) fix.get(o)); + assertEquals(8, o.arr[1]); + } + + @Test + public void testBasicType() { + Long x = 7L; + TypeInformation tpeInfo = BasicTypeInfo.LONG_TYPE_INFO; + + try { + FieldAccessor f = tpeInfo.getFieldAccessor(1, null); + assertFalse("Expected exception, because not the 0th field selected for a basic type.", false); + } catch (InvalidFieldReferenceException ex) { + // OK + } + + try { + FieldAccessor f = tpeInfo.getFieldAccessor("foo", null); + assertFalse("Expected exception, because not the 0th field selected for a basic type.", false); + } catch (InvalidFieldReferenceException ex) { + // OK + } + + FieldAccessor f = tpeInfo.getFieldAccessor(0, null); + assertEquals(7L, (long) f.get(x)); + x = f.set(x, 12L); + assertEquals(12L, (long) f.get(x)); + assertEquals(12L, (long) x); + + FieldAccessor f2 = tpeInfo.getFieldAccessor("*", null); + assertEquals(12L, (long) f2.get(x)); + x = f2.set(x, 14L); + assertEquals(14L, (long) f2.get(x)); + assertEquals(14L, (long) x); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/1f04542e/flink-java/src/main/java/org/apache/flink/api/java/functions/SemanticPropUtil.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/functions/SemanticPropUtil.java b/flink-java/src/main/java/org/apache/flink/api/java/functions/SemanticPropUtil.java index f8c76e1..4a0b2fc 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/functions/SemanticPropUtil.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/functions/SemanticPropUtil.java @@ -28,7 +28,7 @@ import org.apache.flink.api.common.operators.util.FieldSet; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeutils.CompositeType; import org.apache.flink.api.common.typeutils.CompositeType.FlatFieldDescriptor; -import org.apache.flink.api.common.typeutils.CompositeType.InvalidFieldReferenceException; +import org.apache.flink.api.common.typeinfo.InvalidFieldReferenceException; import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields; import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFieldsFirst; import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFieldsSecond; http://git-wip-us.apache.org/repos/asf/flink/blob/1f04542e/flink-java/src/test/java/org/apache/flink/api/java/operator/DataSinkTest.java ---------------------------------------------------------------------- diff --git a/flink-java/src/test/java/org/apache/flink/api/java/operator/DataSinkTest.java b/flink-java/src/test/java/org/apache/flink/api/java/operator/DataSinkTest.java index 0493583..0da417b 100644 --- a/flink-java/src/test/java/org/apache/flink/api/java/operator/DataSinkTest.java +++ b/flink-java/src/test/java/org/apache/flink/api/java/operator/DataSinkTest.java @@ -20,6 +20,7 @@ package org.apache.flink.api.java.operator; import org.apache.flink.api.common.InvalidProgramException; import org.apache.flink.api.common.operators.Order; import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeinfo.InvalidFieldReferenceException; import org.apache.flink.api.common.typeutils.CompositeType; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; @@ -166,7 +167,7 @@ public class DataSinkTest { .sortLocalOutput(5, Order.DESCENDING); } - @Test(expected = CompositeType.InvalidFieldReferenceException.class) + @Test(expected = InvalidFieldReferenceException.class) public void testFailTupleInv() { final ExecutionEnvironment env = ExecutionEnvironment @@ -284,7 +285,7 @@ public class DataSinkTest { .sortLocalOutput(1, Order.DESCENDING); } - @Test(expected = CompositeType.InvalidFieldReferenceException.class) + @Test(expected = InvalidFieldReferenceException.class) public void testFailPojoInvalidField() { final ExecutionEnvironment env = ExecutionEnvironment http://git-wip-us.apache.org/repos/asf/flink/blob/1f04542e/flink-java/src/test/java/org/apache/flink/api/java/operator/FullOuterJoinOperatorTest.java ---------------------------------------------------------------------- diff --git a/flink-java/src/test/java/org/apache/flink/api/java/operator/FullOuterJoinOperatorTest.java b/flink-java/src/test/java/org/apache/flink/api/java/operator/FullOuterJoinOperatorTest.java index 9f2aa41..9f5cfb2 100644 --- a/flink-java/src/test/java/org/apache/flink/api/java/operator/FullOuterJoinOperatorTest.java +++ b/flink-java/src/test/java/org/apache/flink/api/java/operator/FullOuterJoinOperatorTest.java @@ -22,6 +22,7 @@ import org.apache.flink.api.common.InvalidProgramException; import org.apache.flink.api.common.functions.JoinFunction; import org.apache.flink.api.common.operators.base.JoinOperatorBase.JoinHint; import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeinfo.InvalidFieldReferenceException; import org.apache.flink.api.common.typeutils.CompositeType; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; @@ -135,7 +136,7 @@ public class FullOuterJoinOperatorTest { .with(new DummyJoin()); } - @Test(expected = CompositeType.InvalidFieldReferenceException.class) + @Test(expected = InvalidFieldReferenceException.class) public void testFullOuter8() { final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); DataSet> ds1 = env.fromCollection(emptyTupleData, tupleTypeInfo); http://git-wip-us.apache.org/repos/asf/flink/blob/1f04542e/flink-java/src/test/java/org/apache/flink/api/java/operator/LeftOuterJoinOperatorTest.java ---------------------------------------------------------------------- diff --git a/flink-java/src/test/java/org/apache/flink/api/java/operator/LeftOuterJoinOperatorTest.java b/flink-java/src/test/java/org/apache/flink/api/java/operator/LeftOuterJoinOperatorTest.java index bfcc3e8..914c75c 100644 --- a/flink-java/src/test/java/org/apache/flink/api/java/operator/LeftOuterJoinOperatorTest.java +++ b/flink-java/src/test/java/org/apache/flink/api/java/operator/LeftOuterJoinOperatorTest.java @@ -23,6 +23,7 @@ import org.apache.flink.api.common.typeinfo.BasicTypeInfo; import org.apache.flink.api.common.operators.base.JoinOperatorBase.JoinHint; import org.apache.flink.api.common.functions.JoinFunction; +import org.apache.flink.api.common.typeinfo.InvalidFieldReferenceException; import org.apache.flink.api.common.typeutils.CompositeType; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; @@ -136,7 +137,7 @@ public class LeftOuterJoinOperatorTest { .with(new DummyJoin()); } - @Test(expected = CompositeType.InvalidFieldReferenceException.class) + @Test(expected = InvalidFieldReferenceException.class) public void testLeftOuter8() { final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); DataSet> ds1 = env.fromCollection(emptyTupleData, tupleTypeInfo); http://git-wip-us.apache.org/repos/asf/flink/blob/1f04542e/flink-java/src/test/java/org/apache/flink/api/java/operator/RightOuterJoinOperatorTest.java ---------------------------------------------------------------------- diff --git a/flink-java/src/test/java/org/apache/flink/api/java/operator/RightOuterJoinOperatorTest.java b/flink-java/src/test/java/org/apache/flink/api/java/operator/RightOuterJoinOperatorTest.java index 709d830..f5d8129 100644 --- a/flink-java/src/test/java/org/apache/flink/api/java/operator/RightOuterJoinOperatorTest.java +++ b/flink-java/src/test/java/org/apache/flink/api/java/operator/RightOuterJoinOperatorTest.java @@ -22,6 +22,7 @@ import org.apache.flink.api.common.InvalidProgramException; import org.apache.flink.api.common.functions.JoinFunction; import org.apache.flink.api.common.operators.base.JoinOperatorBase.JoinHint; import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeinfo.InvalidFieldReferenceException; import org.apache.flink.api.common.typeutils.CompositeType; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; @@ -135,7 +136,7 @@ public class RightOuterJoinOperatorTest { .with(new DummyJoin()); } - @Test(expected = CompositeType.InvalidFieldReferenceException.class) + @Test(expected = InvalidFieldReferenceException.class) public void testRightOuter8() { final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); DataSet> ds1 = env.fromCollection(emptyTupleData, tupleTypeInfo); http://git-wip-us.apache.org/repos/asf/flink/blob/1f04542e/flink-scala/src/main/java/org/apache/flink/api/scala/typeutils/ProductFieldAccessor.java ---------------------------------------------------------------------- diff --git a/flink-scala/src/main/java/org/apache/flink/api/scala/typeutils/ProductFieldAccessor.java b/flink-scala/src/main/java/org/apache/flink/api/scala/typeutils/ProductFieldAccessor.java new file mode 100644 index 0000000..0be6f33 --- /dev/null +++ b/flink-scala/src/main/java/org/apache/flink/api/scala/typeutils/ProductFieldAccessor.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.scala.typeutils; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.typeinfo.InvalidFieldReferenceException; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.typeutils.FieldAccessor; +import org.apache.flink.api.java.typeutils.TupleTypeInfoBase; +import org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase; +import scala.Product; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +public final class ProductFieldAccessor extends FieldAccessor { + + private static final long serialVersionUID = 1L; + + private final int pos; + private final TupleSerializerBase serializer; + private final Object[] fields; + private final int length; + private final FieldAccessor innerAccessor; + + ProductFieldAccessor(int pos, TypeInformation typeInfo, FieldAccessor innerAccessor, ExecutionConfig config) { + int arity = ((TupleTypeInfoBase)typeInfo).getArity(); + if(pos < 0 || pos >= arity) { + throw new InvalidFieldReferenceException( + "Tried to select " + ((Integer) pos).toString() + ". field on \"" + + typeInfo.toString() + "\", which is an invalid index."); + } + checkNotNull(typeInfo, "typeInfo must not be null."); + checkNotNull(innerAccessor, "innerAccessor must not be null."); + + this.pos = pos; + this.fieldType = ((TupleTypeInfoBase)typeInfo).getTypeAt(pos); + this.serializer = (TupleSerializerBase)typeInfo.createSerializer(config); + this.length = this.serializer.getArity(); + this.fields = new Object[this.length]; + this.innerAccessor = innerAccessor; + } + + @SuppressWarnings("unchecked") + @Override + public F get(T record) { + return innerAccessor.get((R)((Product)record).productElement(pos)); + } + + @SuppressWarnings("unchecked") + @Override + public T set(T record, F fieldValue) { + Product prod = (Product)record; + for (int i = 0; i < length; i++) { + fields[i] = prod.productElement(i); + } + fields[pos] = innerAccessor.set((R)fields[pos], fieldValue); + return serializer.createInstance(fields); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/1f04542e/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/CaseClassTypeInfo.scala ---------------------------------------------------------------------- diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/CaseClassTypeInfo.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/CaseClassTypeInfo.scala index 2aecd7a..d970dfd 100644 --- a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/CaseClassTypeInfo.scala +++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/CaseClassTypeInfo.scala @@ -19,17 +19,18 @@ package org.apache.flink.api.scala.typeutils import java.util -import java.util.regex.{Pattern, Matcher} +import java.util.regex.{Matcher, Pattern} -import org.apache.flink.annotation.{PublicEvolving, Public} +import org.apache.flink.annotation.{Public, PublicEvolving} import org.apache.flink.api.common.ExecutionConfig import org.apache.flink.api.common.operators.Keys import org.apache.flink.api.common.typeinfo.TypeInformation -import org.apache.flink.api.common.typeutils.CompositeType.{TypeComparatorBuilder, -InvalidFieldReferenceException, FlatFieldDescriptor} +import org.apache.flink.api.common.typeutils.CompositeType.{FlatFieldDescriptor, TypeComparatorBuilder} import org.apache.flink.api.common.typeutils._ import Keys.ExpressionKeys -import org.apache.flink.api.java.typeutils.TupleTypeInfoBase +import org.apache.flink.api.common.typeinfo.InvalidFieldReferenceException +import org.apache.flink.api.java.typeutils.FieldAccessor.SimpleFieldAccessor +import org.apache.flink.api.java.typeutils.{FieldAccessor, TupleTypeInfoBase} import scala.collection.JavaConverters._ import scala.collection.mutable.ArrayBuffer @@ -202,7 +203,7 @@ abstract class CaseClassTypeInfo[T <: Product]( override def getFieldIndex(fieldName: String): Int = { val result = fieldNames.indexOf(fieldName) if (result != fieldNames.lastIndexOf(fieldName)) { - -2 + -1 } else { result } @@ -238,6 +239,31 @@ abstract class CaseClassTypeInfo[T <: Product]( } } + override def getFieldAccessor[F](pos: Int, config: ExecutionConfig): FieldAccessor[T, F] = { + new ProductFieldAccessor[T,F,F]( + pos, this, new SimpleFieldAccessor[F](types(pos).asInstanceOf[TypeInformation[F]]), config) + } + + override def getFieldAccessor[F](fieldExpression: String, config: ExecutionConfig): + FieldAccessor[T, F] = { + + val decomp = FieldAccessor.decomposeFieldExpression(fieldExpression) + + val pos = getFieldIndex(decomp.head) + if(pos < 0) { + throw new InvalidFieldReferenceException("Invalid field selected: " + fieldExpression) + } + val fieldType = types(pos) + + if (decomp.tail == null) { + getFieldAccessor(pos, config) + } else { + val innerAccessor = + fieldType.getFieldAccessor[F](decomp.tail, config).asInstanceOf[FieldAccessor[AnyRef, F]] + new ProductFieldAccessor[T,Object,F](pos, this, innerAccessor, config) + } + } + override def toString: String = { clazz.getName + "(" + fieldNames.zip(types).map { case (n, t) => n + ": " + t http://git-wip-us.apache.org/repos/asf/flink/blob/1f04542e/flink-scala/src/test/scala/org/apache/flink/api/scala/typeutils/CaseClassTypeInfoTest.scala ---------------------------------------------------------------------- diff --git a/flink-scala/src/test/scala/org/apache/flink/api/scala/typeutils/CaseClassTypeInfoTest.scala b/flink-scala/src/test/scala/org/apache/flink/api/scala/typeutils/CaseClassTypeInfoTest.scala index 479483f..a9abea1 100644 --- a/flink-scala/src/test/scala/org/apache/flink/api/scala/typeutils/CaseClassTypeInfoTest.scala +++ b/flink-scala/src/test/scala/org/apache/flink/api/scala/typeutils/CaseClassTypeInfoTest.scala @@ -21,9 +21,11 @@ package org.apache.flink.api.scala.typeutils import org.apache.flink.api.common.ExecutionConfig import org.apache.flink.api.common.typeinfo.BasicTypeInfo import org.apache.flink.api.common.typeutils.TypeSerializer +import org.apache.flink.api.java.typeutils.FieldAccessorTest import org.apache.flink.util.TestLogger import org.junit.Test import org.scalatest.junit.JUnitSuiteLike +import org.apache.flink.api.scala._ class CaseClassTypeInfoTest extends TestLogger with JUnitSuiteLike { @@ -70,4 +72,112 @@ class CaseClassTypeInfoTest extends TestLogger with JUnitSuiteLike { assert(!tpeInfo1.equals(tpeInfo2)) } + @Test + def testFieldAccessorFlatCaseClass(): Unit = { + case class IntBoolean(foo: Int, bar: Boolean) + val tpeInfo = createTypeInformation[IntBoolean] + + { + // by field name + val accessor1 = tpeInfo.getFieldAccessor[Int]("foo", null) + val accessor2 = tpeInfo.getFieldAccessor[Boolean]("bar", null) + + val x1 = IntBoolean(5, false) + assert(accessor1.get(x1) == 5) + assert(accessor2.get(x1) == false) + assert(x1.foo == 5) + assert(x1.bar == false) + + val x2: IntBoolean = accessor1.set(x1, 6) + assert(accessor1.get(x2) == 6) + assert(x2.foo == 6) + + val x3 = accessor2.set(x2, true) + assert(x3.bar == true) + assert(accessor2.get(x3) == true) + assert(x3.foo == 6) + } + + { + // by field pos + val accessor1 = tpeInfo.getFieldAccessor[Int](0, null) + val accessor2 = tpeInfo.getFieldAccessor[Boolean](1, null) + + val x1 = IntBoolean(5, false) + assert(accessor1.get(x1) == 5) + assert(accessor2.get(x1) == false) + assert(x1.foo == 5) + assert(x1.bar == false) + + val x2: IntBoolean = accessor1.set(x1, 6) + assert(accessor1.get(x2) == 6) + assert(x2.foo == 6) + + val x3 = accessor2.set(x2, true) + assert(x3.bar == true) + assert(accessor2.get(x3) == true) + assert(x3.foo == 6) + } + } + + @Test + def testFieldAccessorTuple(): Unit = { + val tpeInfo = createTypeInformation[(Int, Long)] + var x = (5, 6L) + val f0 = tpeInfo.getFieldAccessor[Int](0, null) + assert(f0.get(x) == 5) + x = f0.set(x, 8) + assert(f0.get(x) == 8) + assert(x._1 == 8) + } + + @Test + def testFieldAccessorCaseClassInCaseClass(): Unit = { + case class Inner(a: Short, b: String) + case class Outer(a: Int, i: Inner, b: Boolean) + val tpeInfo = createTypeInformation[Outer] + + var x = Outer(1, Inner(2, "alma"), true) + + val fib = tpeInfo.getFieldAccessor[String]("i.b", null) + assert(fib.get(x) == "alma") + assert(x.i.b == "alma") + x = fib.set(x, "korte") + assert(fib.get(x) == "korte") + assert(x.i.b == "korte") + + val fi = tpeInfo.getFieldAccessor[Inner]("i", null) + assert(fi.get(x) == Inner(2, "korte")) + x = fi.set(x, Inner(3, "aaa")) + assert(x.i == Inner(3, "aaa")) + } + + @Test + def testFieldAccessorPojoInCaseClass(): Unit = { + case class Outer(a: Int, i: FieldAccessorTest.Inner, b: Boolean) + var x = Outer(1, new FieldAccessorTest.Inner(3L, true), false) + val tpeInfo = createTypeInformation[Outer] + val cfg = new ExecutionConfig + + val fib = tpeInfo.getFieldAccessor[Boolean]("i.b", cfg) + assert(fib.get(x) == true) + assert(x.i.b == true) + x = fib.set(x, false) + assert(fib.get(x) == false) + assert(x.i.b == false) + + val fi = tpeInfo.getFieldAccessor[FieldAccessorTest.Inner]("i", cfg) + assert(fi.get(x).x == 3L) + assert(x.i.x == 3L) + x = fi.set(x, new FieldAccessorTest.Inner(4L, true)) + assert(fi.get(x).x == 4L) + assert(x.i.x == 4L) + + val fin = tpeInfo.getFieldAccessor[FieldAccessorTest.Inner](1, cfg) + assert(fin.get(x).x == 4L) + assert(x.i.x == 4L) + x = fin.set(x, new FieldAccessorTest.Inner(5L, true)) + assert(fin.get(x).x == 5L) + assert(x.i.x == 5L) + } } http://git-wip-us.apache.org/repos/asf/flink/blob/1f04542e/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java index 4063b60..264d5d0 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java @@ -365,7 +365,9 @@ public class KeyedStream extends DataStream { * per key. * * @param positionToSum - * The position in the data point to sum + * The field position in the data points to sum. This is applicable to + * Tuple types, basic and primitive array types, Scala case classes, + * and primitive types (which is considered as having one field). * @return The transformed DataStream. */ public SingleOutputStreamOperator sum(int positionToSum) { @@ -373,16 +375,18 @@ public class KeyedStream extends DataStream { } /** - * Applies an aggregation that that gives the current sum of the pojo data - * stream at the given field expressionby the given key. An independent - * aggregate is kept per key. A field expression is either the name of a - * public field or a getter method with parentheses of the - * {@link DataStream}S underlying type. A dot can be used to drill down into - * objects, as in {@code "field1.getInnerField2()" }. + * Applies an aggregation that gives the current sum of the data + * stream at the given field by the given key. An independent + * aggregate is kept per key. * * @param field - * The field expression based on which the aggregation will be - * applied. + * In case of a POJO, Scala case class, or Tuple type, the + * name of the (public) field on which to perform the aggregation. + * Additionally, a dot can be used to drill down into nested + * objects, as in {@code "field1.fieldxy" }. + * Furthermore, an array index can also be specified in case of an array of + * a primitive or basic type; or "0" or "*" can be specified in case of a + * basic type (which is considered as having only one field). * @return The transformed DataStream. */ public SingleOutputStreamOperator sum(String field) { @@ -390,12 +394,14 @@ public class KeyedStream extends DataStream { } /** - * Applies an aggregation that that gives the current minimum of the data + * Applies an aggregation that gives the current minimum of the data * stream at the given position by the given key. An independent aggregate * is kept per key. * * @param positionToMin - * The position in the data point to minimize + * The field position in the data points to minimize. This is applicable to + * Tuple types, basic and primitive array types, Scala case classes, + * and primitive types (which is considered as having one field). * @return The transformed DataStream. */ public SingleOutputStreamOperator min(int positionToMin) { @@ -404,16 +410,21 @@ public class KeyedStream extends DataStream { } /** - * Applies an aggregation that that gives the current minimum of the pojo + * Applies an aggregation that gives the current minimum of the * data stream at the given field expression by the given key. An * independent aggregate is kept per key. A field expression is either the * name of a public field or a getter method with parentheses of the - * {@link DataStream}S underlying type. A dot can be used to drill down into - * objects, as in {@code "field1.getInnerField2()" }. + * {@link DataStream}'s underlying type. A dot can be used to drill down into + * objects, as in {@code "field1.fieldxy" }. * * @param field - * The field expression based on which the aggregation will be - * applied. + * In case of a POJO, Scala case class, or Tuple type, the + * name of the (public) field on which to perform the aggregation. + * Additionally, a dot can be used to drill down into nested + * objects, as in {@code "field1.fieldxy" }. + * Furthermore, an array index can also be specified in case of an array of + * a primitive or basic type; or "0" or "*" can be specified in case of a + * basic type (which is considered as having only one field). * @return The transformed DataStream. */ public SingleOutputStreamOperator min(String field) { @@ -427,7 +438,9 @@ public class KeyedStream extends DataStream { * per key. * * @param positionToMax - * The position in the data point to maximize + * The field position in the data points to maximize. This is applicable to + * Tuple types, basic and primitive array types, Scala case classes, + * and primitive types (which is considered as having one field). * @return The transformed DataStream. */ public SingleOutputStreamOperator max(int positionToMax) { @@ -436,16 +449,21 @@ public class KeyedStream extends DataStream { } /** - * Applies an aggregation that that gives the current maximum of the pojo + * Applies an aggregation that gives the current maximum of the * data stream at the given field expression by the given key. An * independent aggregate is kept per key. A field expression is either the * name of a public field or a getter method with parentheses of the - * {@link DataStream}S underlying type. A dot can be used to drill down into - * objects, as in {@code "field1.getInnerField2()" }. + * {@link DataStream}'s underlying type. A dot can be used to drill down into + * objects, as in {@code "field1.fieldxy" }. * * @param field - * The field expression based on which the aggregation will be - * applied. + * In case of a POJO, Scala case class, or Tuple type, the + * name of the (public) field on which to perform the aggregation. + * Additionally, a dot can be used to drill down into nested + * objects, as in {@code "field1.fieldxy" }. + * Furthermore, an array index can also be specified in case of an array of + * a primitive or basic type; or "0" or "*" can be specified in case of a + * basic type (which is considered as having only one field). * @return The transformed DataStream. */ public SingleOutputStreamOperator max(String field) { @@ -454,16 +472,21 @@ public class KeyedStream extends DataStream { } /** - * Applies an aggregation that that gives the current minimum element of the - * pojo data stream by the given field expression by the given key. An + * Applies an aggregation that gives the current minimum element of the + * data stream by the given field expression by the given key. An * independent aggregate is kept per key. A field expression is either the * name of a public field or a getter method with parentheses of the - * {@link DataStream}S underlying type. A dot can be used to drill down into - * objects, as in {@code "field1.getInnerField2()" }. + * {@link DataStream}'s underlying type. A dot can be used to drill down into + * objects, as in {@code "field1.fieldxy" }. * * @param field - * The field expression based on which the aggregation will be - * applied. + * In case of a POJO, Scala case class, or Tuple type, the + * name of the (public) field on which to perform the aggregation. + * Additionally, a dot can be used to drill down into nested + * objects, as in {@code "field1.fieldxy" }. + * Furthermore, an array index can also be specified in case of an array of + * a primitive or basic type; or "0" or "*" can be specified in case of a + * basic type (which is considered as having only one field). * @param first * If True then in case of field equality the first object will * be returned @@ -476,16 +499,21 @@ public class KeyedStream extends DataStream { } /** - * Applies an aggregation that that gives the current maximum element of the - * pojo data stream by the given field expression by the given key. An + * Applies an aggregation that gives the current maximum element of the + * data stream by the given field expression by the given key. An * independent aggregate is kept per key. A field expression is either the * name of a public field or a getter method with parentheses of the - * {@link DataStream}S underlying type. A dot can be used to drill down into - * objects, as in {@code "field1.getInnerField2()" }. + * {@link DataStream}'s underlying type. A dot can be used to drill down into + * objects, as in {@code "field1.fieldxy" }. * * @param field - * The field expression based on which the aggregation will be - * applied. + * In case of a POJO, Scala case class, or Tuple type, the + * name of the (public) field on which to perform the aggregation. + * Additionally, a dot can be used to drill down into nested + * objects, as in {@code "field1.fieldxy" }. + * Furthermore, an array index can also be specified in case of an array of + * a primitive or basic type; or "0" or "*" can be specified in case of a + * basic type (which is considered as having only one field). * @param first * If True then in case of field equality the first object will * be returned @@ -497,13 +525,15 @@ public class KeyedStream extends DataStream { } /** - * Applies an aggregation that that gives the current element with the + * Applies an aggregation that gives the current element with the * minimum value at the given position by the given key. An independent * aggregate is kept per key. If more elements have the minimum value at the * given position, the operator returns the first one by default. * * @param positionToMinBy - * The position in the data point to minimize + * The field position in the data points to minimize. This is applicable to + * Tuple types, basic and primitive array types, Scala case classes, + * and primitive types (which is considered as having one field). * @return The transformed DataStream. */ public SingleOutputStreamOperator minBy(int positionToMinBy) { @@ -511,13 +541,19 @@ public class KeyedStream extends DataStream { } /** - * Applies an aggregation that that gives the current element with the + * Applies an aggregation that gives the current element with the * minimum value at the given position by the given key. An independent * aggregate is kept per key. If more elements have the minimum value at the * given position, the operator returns the first one by default. * * @param positionToMinBy - * The position in the data point to minimize + * In case of a POJO, Scala case class, or Tuple type, the + * name of the (public) field on which to perform the aggregation. + * Additionally, a dot can be used to drill down into nested + * objects, as in {@code "field1.fieldxy" }. + * Furthermore, an array index can also be specified in case of an array of + * a primitive or basic type; or "0" or "*" can be specified in case of a + * basic type (which is considered as having only one field). * @return The transformed DataStream. */ public SingleOutputStreamOperator minBy(String positionToMinBy) { @@ -525,14 +561,16 @@ public class KeyedStream extends DataStream { } /** - * Applies an aggregation that that gives the current element with the + * Applies an aggregation that gives the current element with the * minimum value at the given position by the given key. An independent * aggregate is kept per key. If more elements have the minimum value at the * given position, the operator returns either the first or last one, * depending on the parameter set. * * @param positionToMinBy - * The position in the data point to minimize + * The field position in the data points to minimize. This is applicable to + * Tuple types, basic and primitive array types, Scala case classes, + * and primitive types (which is considered as having one field). * @param first * If true, then the operator return the first element with the * minimal value, otherwise returns the last @@ -544,13 +582,15 @@ public class KeyedStream extends DataStream { } /** - * Applies an aggregation that that gives the current element with the + * Applies an aggregation that gives the current element with the * maximum value at the given position by the given key. An independent * aggregate is kept per key. If more elements have the maximum value at the * given position, the operator returns the first one by default. * * @param positionToMaxBy - * The position in the data point to maximize + * The field position in the data points to maximize. This is applicable to + * Tuple types, basic and primitive array types, Scala case classes, + * and primitive types (which is considered as having one field). * @return The transformed DataStream. */ public SingleOutputStreamOperator maxBy(int positionToMaxBy) { @@ -558,13 +598,19 @@ public class KeyedStream extends DataStream { } /** - * Applies an aggregation that that gives the current element with the + * Applies an aggregation that gives the current element with the * maximum value at the given position by the given key. An independent * aggregate is kept per key. If more elements have the maximum value at the * given position, the operator returns the first one by default. * * @param positionToMaxBy - * The position in the data point to maximize + * In case of a POJO, Scala case class, or Tuple type, the + * name of the (public) field on which to perform the aggregation. + * Additionally, a dot can be used to drill down into nested + * objects, as in {@code "field1.fieldxy" }. + * Furthermore, an array index can also be specified in case of an array of + * a primitive or basic type; or "0" or "*" can be specified in case of a + * basic type (which is considered as having only one field). * @return The transformed DataStream. */ public SingleOutputStreamOperator maxBy(String positionToMaxBy) { @@ -572,14 +618,16 @@ public class KeyedStream extends DataStream { } /** - * Applies an aggregation that that gives the current element with the + * Applies an aggregation that gives the current element with the * maximum value at the given position by the given key. An independent * aggregate is kept per key. If more elements have the maximum value at the * given position, the operator returns either the first or last one, * depending on the parameter set. * * @param positionToMaxBy - * The position in the data point to maximize. + * The field position in the data points to maximize. This is applicable to + * Tuple types, basic and primitive array types, Scala case classes, + * and primitive types (which is considered as having one field). * @param first * If true, then the operator return the first element with the * maximum value, otherwise returns the last