Return-Path: X-Original-To: apmail-flink-commits-archive@minotaur.apache.org Delivered-To: apmail-flink-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 246AD17A0E for ; Wed, 28 Jan 2015 01:24:01 +0000 (UTC) Received: (qmail 24933 invoked by uid 500); 28 Jan 2015 01:24:01 -0000 Delivered-To: apmail-flink-commits-archive@flink.apache.org Received: (qmail 24873 invoked by uid 500); 28 Jan 2015 01:24:01 -0000 Mailing-List: contact commits-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.apache.org Delivered-To: mailing list commits@flink.apache.org Received: (qmail 24779 invoked by uid 99); 28 Jan 2015 01:24:01 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 28 Jan 2015 01:24:01 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 245C1E0E7C; Wed, 28 Jan 2015 01:24:01 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: fhueske@apache.org To: commits@flink.apache.org Date: Wed, 28 Jan 2015 01:24:06 -0000 Message-Id: <7767a33c662842139924cdc8987e72e4@git.apache.org> In-Reply-To: <8ef37b8302da4cbebf76d8adb91087f4@git.apache.org> References: <8ef37b8302da4cbebf76d8adb91087f4@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [06/13] flink git commit: [FLINK-1328] [api-breaking][java-api][scala-api][optimizer] Reworked semantic annotations for functions. - Renamed constantField annotations to forwardedFields annotation - Forwarded fields can be defined for (nested) tuples, Po http://git-wip-us.apache.org/repos/asf/flink/blob/de8e066c/flink-java/src/main/java/org/apache/flink/api/java/typeutils/PojoTypeInfo.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/PojoTypeInfo.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/PojoTypeInfo.java index 0103a7b..a6cc8bd 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/PojoTypeInfo.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/PojoTypeInfo.java @@ -25,9 +25,9 @@ import java.util.Arrays; import java.util.Collections; import java.util.Comparator; import java.util.List; +import java.util.regex.Matcher; +import java.util.regex.Pattern; -import org.apache.commons.lang3.Validate; -import org.apache.flink.api.common.typeinfo.AtomicType; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeutils.CompositeType; import org.apache.flink.api.common.typeutils.TypeComparator; @@ -45,6 +45,15 @@ import com.google.common.base.Joiner; */ public class PojoTypeInfo extends CompositeType{ + private final static String REGEX_FIELD = "[\\p{L}_\\$][\\p{L}\\p{Digit}_\\$]*"; + private final static String REGEX_NESTED_FIELDS = "("+REGEX_FIELD+")(\\.(.+))?"; + private final static String REGEX_NESTED_FIELDS_WILDCARD = REGEX_NESTED_FIELDS + +"|\\"+ExpressionKeys.SELECT_ALL_CHAR + +"|\\"+ExpressionKeys.SELECT_ALL_CHAR_SCALA; + + private static final Pattern PATTERN_NESTED_FIELDS = Pattern.compile(REGEX_NESTED_FIELDS); + private static final Pattern PATTERN_NESTED_FIELDS_WILDCARD = Pattern.compile(REGEX_NESTED_FIELDS_WILDCARD); + private final Class typeClass; private PojoField[] fields; @@ -103,62 +112,119 @@ public class PojoTypeInfo extends CompositeType{ return Comparable.class.isAssignableFrom(typeClass); } + @Override - public void getKey(String fieldExpression, int offset, List result) { - // handle 'select all' first - if(fieldExpression.equals(ExpressionKeys.SELECT_ALL_CHAR) || fieldExpression.equals(ExpressionKeys.SELECT_ALL_CHAR_SCALA)) { + public void getFlatFields(String fieldExpression, int offset, List result) { + + Matcher matcher = PATTERN_NESTED_FIELDS_WILDCARD.matcher(fieldExpression); + if(!matcher.matches()) { + throw new InvalidFieldReferenceException("Invalid POJO field reference \""+fieldExpression+"\"."); + } + + String field = matcher.group(0); + if(field.equals(ExpressionKeys.SELECT_ALL_CHAR) || field.equals(ExpressionKeys.SELECT_ALL_CHAR_SCALA)) { + // handle select all int keyPosition = 0; - for(PojoField field : fields) { - if(field.type instanceof AtomicType) { - result.add(new FlatFieldDescriptor(offset + keyPosition, field.type)); - } else if(field.type instanceof CompositeType) { - CompositeType cType = (CompositeType)field.type; - cType.getKey(String.valueOf(ExpressionKeys.SELECT_ALL_CHAR), offset + keyPosition, result); + for(PojoField pField : fields) { + if(pField.type instanceof CompositeType) { + CompositeType cType = (CompositeType)pField.type; + cType.getFlatFields(String.valueOf(ExpressionKeys.SELECT_ALL_CHAR), offset + keyPosition, result); keyPosition += cType.getTotalFields()-1; } else { - throw new RuntimeException("Unexpected key type: "+field.type); + result.add(new NamedFlatFieldDescriptor(pField.field.getName(), offset + keyPosition, pField.type)); } keyPosition++; } return; + } else { + field = matcher.group(1); + } + + // get field + int fieldPos = -1; + TypeInformation fieldType = null; + for (int i = 0; i < fields.length; i++) { + if (fields[i].field.getName().equals(field)) { + fieldPos = i; + fieldType = fields[i].type; + break; + } + } + if (fieldPos == -1) { + throw new InvalidFieldReferenceException("Unable to find field \""+field+"\" in type "+this+"."); } - Validate.notEmpty(fieldExpression, "Field expression must not be empty."); - // if there is a dot try getting the field from that sub field - int firstDot = fieldExpression.indexOf('.'); - if (firstDot == -1) { - // this is the last field (or only field) in the field expression - int fieldId = 0; - for (int i = 0; i < fields.length; i++) { - if(fields[i].type instanceof CompositeType) { - fieldId += fields[i].type.getTotalFields()-1; + String tail = matcher.group(3); + if(tail == null) { + if(fieldType instanceof CompositeType) { + // forward offset + for(int i=0; i)) { - throw new RuntimeException("Field "+fields[i].type+" (specified by '"+fieldExpression+"') is not a composite type"); - } - CompositeType cType = (CompositeType) fields[i].type; - cType.getKey(rest, offset + fieldId, result); // recurse - return; + if(fieldType instanceof CompositeType) { + // forward offset + for(int i=0; i getTypeAt(String fieldExpression) { + + Matcher matcher = PATTERN_NESTED_FIELDS.matcher(fieldExpression); + if(!matcher.matches()) { + if (fieldExpression.startsWith(ExpressionKeys.SELECT_ALL_CHAR) || fieldExpression.startsWith(ExpressionKeys.SELECT_ALL_CHAR_SCALA)) { + throw new InvalidFieldReferenceException("Wildcard expressions are not allowed here."); + } else { + throw new InvalidFieldReferenceException("Invalid format of POJO field expression \""+fieldExpression+"\"."); + } + } + + String field = matcher.group(1); + // get field + int fieldPos = -1; + TypeInformation fieldType = null; + for (int i = 0; i < fields.length; i++) { + if (fields[i].field.getName().equals(field)) { + fieldPos = i; + fieldType = fields[i].type; + break; + } + } + if (fieldPos == -1) { + throw new InvalidFieldReferenceException("Unable to find field \""+field+"\" in type "+this+"."); + } + + String tail = matcher.group(3); + if(tail == null) { + // we found the type + return fieldType; + } else { + if(fieldType instanceof CompositeType) { + return ((CompositeType) fieldType).getTypeAt(tail); + } else { + throw new InvalidFieldReferenceException("Nested field expression \""+tail+"\" not possible on atomic type "+fieldType+"."); } - throw new RuntimeException("Unable to find field "+fieldExpression+" in type "+this+" (looking for '"+firstField+"')"); } } @@ -248,4 +314,23 @@ public class PojoTypeInfo extends CompositeType{ + ", fields = [" + Joiner.on(", ").join(fieldStrings) + "]" + ">"; } + + public static class NamedFlatFieldDescriptor extends FlatFieldDescriptor { + + private String fieldName; + + public NamedFlatFieldDescriptor(String name, int keyPosition, TypeInformation type) { + super(keyPosition, type); + this.fieldName = name; + } + + public String getFieldName() { + return fieldName; + } + + @Override + public String toString() { + return "NamedFlatFieldDescriptor [name="+fieldName+" position="+getPosition()+" typeInfo="+getType()+"]"; + } + } } http://git-wip-us.apache.org/repos/asf/flink/blob/de8e066c/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TupleTypeInfoBase.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TupleTypeInfoBase.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TupleTypeInfoBase.java index 22f7942..effec2b 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TupleTypeInfoBase.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TupleTypeInfoBase.java @@ -20,23 +20,32 @@ package org.apache.flink.api.java.typeutils; import java.util.Arrays; import java.util.List; +import java.util.regex.Matcher; +import java.util.regex.Pattern; -import org.apache.commons.lang3.StringUtils; import org.apache.flink.api.common.typeinfo.AtomicType; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeutils.CompositeType; import org.apache.flink.api.java.operators.Keys.ExpressionKeys; -import com.google.common.base.Preconditions; - public abstract class TupleTypeInfoBase extends CompositeType { - + + private final static String REGEX_FIELD = "(f?)([0-9]+)"; + private final static String REGEX_NESTED_FIELDS = "("+REGEX_FIELD+")(\\.(.+))?"; + private final static String REGEX_NESTED_FIELDS_WILDCARD = REGEX_NESTED_FIELDS + +"|\\"+ExpressionKeys.SELECT_ALL_CHAR + +"|\\"+ExpressionKeys.SELECT_ALL_CHAR_SCALA; + + private static final Pattern PATTERN_FIELD = Pattern.compile(REGEX_FIELD); + private static final Pattern PATTERN_NESTED_FIELDS = Pattern.compile(REGEX_NESTED_FIELDS); + private static final Pattern PATTERN_NESTED_FIELDS_WILDCARD = Pattern.compile(REGEX_NESTED_FIELDS_WILDCARD); + protected final TypeInformation[] types; protected final Class tupleType; private int totalFields; - + public TupleTypeInfoBase(Class tupleType, TypeInformation... types) { super(tupleType); this.tupleType = tupleType; @@ -90,82 +99,114 @@ public abstract class TupleTypeInfoBase extends CompositeType { startKeyId += type.getTotalFields(); } } - @Override - public void getKey(String fieldExpression, int offset, List result) { - // handle 'select all' - if(fieldExpression.equals(ExpressionKeys.SELECT_ALL_CHAR) || fieldExpression.equals(ExpressionKeys.SELECT_ALL_CHAR_SCALA)) { + public void getFlatFields(String fieldExpression, int offset, List result) { + + Matcher matcher = PATTERN_NESTED_FIELDS_WILDCARD.matcher(fieldExpression); + if (!matcher.matches()) { + throw new InvalidFieldReferenceException("Invalid tuple field reference \""+fieldExpression+"\"."); + } + + String field = matcher.group(0); + if (field.equals(ExpressionKeys.SELECT_ALL_CHAR) || field.equals(ExpressionKeys.SELECT_ALL_CHAR_SCALA)) { + // handle select all int keyPosition = 0; - for(TypeInformation type : types) { - if(type instanceof AtomicType) { - result.add(new FlatFieldDescriptor(offset + keyPosition, type)); - } else if(type instanceof CompositeType) { - CompositeType cType = (CompositeType)type; - cType.getKey(String.valueOf(ExpressionKeys.SELECT_ALL_CHAR), offset + keyPosition, result); - keyPosition += cType.getTotalFields()-1; + for (TypeInformation type : types) { + if (type instanceof CompositeType) { + CompositeType cType = (CompositeType) type; + cType.getFlatFields(String.valueOf(ExpressionKeys.SELECT_ALL_CHAR), offset + keyPosition, result); + keyPosition += cType.getTotalFields() - 1; } else { - throw new RuntimeException("Unexpected key type: "+type); + result.add(new FlatFieldDescriptor(offset + keyPosition, type)); } keyPosition++; } return; } - // check input - if(fieldExpression.length() < 2) { - throw new IllegalArgumentException("The field expression '"+fieldExpression+"' is incorrect. The length must be at least 2"); - } - if(fieldExpression.charAt(0) != 'f') { - throw new IllegalArgumentException("The field expression '"+fieldExpression+"' is incorrect for a Tuple type. It has to start with an 'f'"); - } - // get first component of nested expression - int dotPos = fieldExpression.indexOf('.'); - String nestedSplitFirst = fieldExpression; - if(dotPos != -1 ) { - Preconditions.checkArgument(dotPos != fieldExpression.length()-1, "The field expression can never end with a dot."); - nestedSplitFirst = fieldExpression.substring(0, dotPos); - } - String fieldNumStr = nestedSplitFirst.substring(1, nestedSplitFirst.length()); - if(!StringUtils.isNumeric(fieldNumStr)) { - throw new IllegalArgumentException("The field expression '"+fieldExpression+"' is incorrect. Field number '"+fieldNumStr+" is not numeric"); - } - int pos = -1; - try { - pos = Integer.valueOf(fieldNumStr); - } catch(NumberFormatException nfe) { - throw new IllegalArgumentException("The field expression '"+fieldExpression+"' is incorrect. Field number '"+fieldNumStr+" is not numeric", nfe); + + String fieldStr = matcher.group(1); + Matcher fieldMatcher = PATTERN_FIELD.matcher(fieldStr); + if (!fieldMatcher.matches()) { + throw new RuntimeException("Invalid matcher pattern"); } - if(pos < 0) { - throw new IllegalArgumentException("Negative position is not possible"); + field = fieldMatcher.group(2); + int fieldPos = Integer.valueOf(field); + + if (fieldPos >= this.getArity()) { + throw new InvalidFieldReferenceException("Tuple field expression \"" + fieldStr + "\" out of bounds of " + this.toString() + "."); } - // pass down the remainder (after the dot) of the fieldExpression to the type at that position. - if(dotPos != -1) { // we need to go deeper - String rem = fieldExpression.substring(dotPos+1); - if( !(types[pos] instanceof CompositeType) ) { - throw new RuntimeException("Element at position "+pos+" is not a composite type. There are no nested types to select"); + TypeInformation fieldType = this.getTypeAt(fieldPos); + String tail = matcher.group(5); + if(tail == null) { + if(fieldType instanceof CompositeType) { + // forward offsets + for(int i=0; i cType = (CompositeType) types[pos]; - // count nested fields before "pos" - for (int i = 0; i < pos; i++) { - offset += types[i].getTotalFields(); + } else { + if(fieldType instanceof CompositeType) { + // forward offset + for(int i=0; i= types.length) { - throw new IllegalArgumentException("The specified tuple position does not exist"); + } + + public TypeInformation getTypeAt(String fieldExpression) { + + Matcher matcher = PATTERN_NESTED_FIELDS.matcher(fieldExpression); + if(!matcher.matches()) { + if (fieldExpression.equals(ExpressionKeys.SELECT_ALL_CHAR) || fieldExpression.equals(ExpressionKeys.SELECT_ALL_CHAR_SCALA)) { + throw new InvalidFieldReferenceException("Wildcard expressions are not allowed here."); + } else { + throw new InvalidFieldReferenceException("Invalid format of tuple field expression \""+fieldExpression+"\"."); + } } - - // count nested fields before "pos". - for(int i = 0; i < pos; i++) { - offset += types[i].getTotalFields() - 1; // this adds only something to offset if its a composite type. + + String fieldStr = matcher.group(1); + Matcher fieldMatcher = PATTERN_FIELD.matcher(fieldStr); + if(!fieldMatcher.matches()) { + throw new RuntimeException("Invalid matcher pattern"); } - if(types[pos] instanceof CompositeType) { - throw new IllegalArgumentException("The specified field '"+fieldExpression+"' is refering to a composite type.\n" - + "Either select all elements in this type with the '"+ExpressionKeys.SELECT_ALL_CHAR+"' operator or specify a field in the sub-type"); + String field = fieldMatcher.group(2); + int fieldPos = Integer.valueOf(field); + + if(fieldPos >= this.getArity()) { + throw new InvalidFieldReferenceException("Tuple field expression \""+fieldStr+"\" out of bounds of "+this.toString()+"."); + } + TypeInformation fieldType = this.getTypeAt(fieldPos); + String tail = matcher.group(5); + if(tail == null) { + // we found the type + return fieldType; + } else { + if(fieldType instanceof CompositeType) { + return ((CompositeType) fieldType).getTypeAt(tail); + } else { + throw new InvalidFieldReferenceException("Nested field expression \""+tail+"\" not possible on atomic type "+fieldType+"."); + } } - result.add(new FlatFieldDescriptor(offset + pos, types[pos])); } public TypeInformation getTypeAt(int pos) {