flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From fhue...@apache.org
Subject [06/13] flink git commit: [FLINK-1328] [api-breaking][java-api][scala-api][optimizer] Reworked semantic annotations for functions. - Renamed constantField annotations to forwardedFields annotation - Forwarded fields can be defined for (nested) tuples, Po
Date Wed, 28 Jan 2015 01:24:06 GMT
http://git-wip-us.apache.org/repos/asf/flink/blob/de8e066c/flink-java/src/main/java/org/apache/flink/api/java/typeutils/PojoTypeInfo.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/PojoTypeInfo.java
b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/PojoTypeInfo.java
index 0103a7b..a6cc8bd 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/PojoTypeInfo.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/PojoTypeInfo.java
@@ -25,9 +25,9 @@ import java.util.Arrays;
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.List;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
 
-import org.apache.commons.lang3.Validate;
-import org.apache.flink.api.common.typeinfo.AtomicType;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.CompositeType;
 import org.apache.flink.api.common.typeutils.TypeComparator;
@@ -45,6 +45,15 @@ import com.google.common.base.Joiner;
  */
 public class PojoTypeInfo<T> extends CompositeType<T>{
 
+	private final static String REGEX_FIELD = "[\\p{L}_\\$][\\p{L}\\p{Digit}_\\$]*";
+	private final static String REGEX_NESTED_FIELDS = "("+REGEX_FIELD+")(\\.(.+))?";
+	private final static String REGEX_NESTED_FIELDS_WILDCARD = REGEX_NESTED_FIELDS
+			+"|\\"+ExpressionKeys.SELECT_ALL_CHAR
+			+"|\\"+ExpressionKeys.SELECT_ALL_CHAR_SCALA;
+
+	private static final Pattern PATTERN_NESTED_FIELDS = Pattern.compile(REGEX_NESTED_FIELDS);
+	private static final Pattern PATTERN_NESTED_FIELDS_WILDCARD = Pattern.compile(REGEX_NESTED_FIELDS_WILDCARD);
+
 	private final Class<T> typeClass;
 
 	private PojoField[] fields;
@@ -103,62 +112,119 @@ public class PojoTypeInfo<T> extends CompositeType<T>{
 		return Comparable.class.isAssignableFrom(typeClass);
 	}
 	
+
 	@Override
-	public void getKey(String fieldExpression, int offset, List<FlatFieldDescriptor> result)
{
-		// handle 'select all' first
-		if(fieldExpression.equals(ExpressionKeys.SELECT_ALL_CHAR) || fieldExpression.equals(ExpressionKeys.SELECT_ALL_CHAR_SCALA))
{
+	public void getFlatFields(String fieldExpression, int offset, List<FlatFieldDescriptor>
result) {
+
+		Matcher matcher = PATTERN_NESTED_FIELDS_WILDCARD.matcher(fieldExpression);
+		if(!matcher.matches()) {
+			throw new InvalidFieldReferenceException("Invalid POJO field reference \""+fieldExpression+"\".");
+		}
+
+		String field = matcher.group(0);
+		if(field.equals(ExpressionKeys.SELECT_ALL_CHAR) || field.equals(ExpressionKeys.SELECT_ALL_CHAR_SCALA))
{
+			// handle select all
 			int keyPosition = 0;
-			for(PojoField field : fields) {
-				if(field.type instanceof AtomicType) {
-					result.add(new FlatFieldDescriptor(offset + keyPosition, field.type));
-				} else if(field.type instanceof CompositeType) {
-					CompositeType<?> cType = (CompositeType<?>)field.type;
-					cType.getKey(String.valueOf(ExpressionKeys.SELECT_ALL_CHAR), offset + keyPosition, result);
+			for(PojoField pField : fields) {
+				if(pField.type instanceof CompositeType) {
+					CompositeType<?> cType = (CompositeType<?>)pField.type;
+					cType.getFlatFields(String.valueOf(ExpressionKeys.SELECT_ALL_CHAR), offset + keyPosition,
result);
 					keyPosition += cType.getTotalFields()-1;
 				} else {
-					throw new RuntimeException("Unexpected key type: "+field.type);
+					result.add(new NamedFlatFieldDescriptor(pField.field.getName(), offset + keyPosition,
pField.type));
 				}
 				keyPosition++;
 			}
 			return;
+		} else {
+			field = matcher.group(1);
+		}
+
+		// get field
+		int fieldPos = -1;
+		TypeInformation<?> fieldType = null;
+		for (int i = 0; i < fields.length; i++) {
+			if (fields[i].field.getName().equals(field)) {
+				fieldPos = i;
+				fieldType = fields[i].type;
+				break;
+			}
+		}
+		if (fieldPos == -1) {
+			throw new InvalidFieldReferenceException("Unable to find field \""+field+"\" in type "+this+".");
 		}
-		Validate.notEmpty(fieldExpression, "Field expression must not be empty.");
-		// if there is a dot try getting the field from that sub field
-		int firstDot = fieldExpression.indexOf('.');
-		if (firstDot == -1) {
-			// this is the last field (or only field) in the field expression
-			int fieldId = 0;
-			for (int i = 0; i < fields.length; i++) {
-				if(fields[i].type instanceof CompositeType) {
-					fieldId += fields[i].type.getTotalFields()-1;
+		String tail = matcher.group(3);
+		if(tail == null) {
+			if(fieldType instanceof CompositeType) {
+				// forward offset
+				for(int i=0; i<fieldPos; i++) {
+					offset += this.getTypeAt(i).getTotalFields();
 				}
-				if (fields[i].field.getName().equals(fieldExpression)) {
-					if(fields[i].type instanceof CompositeType) {
-						throw new IllegalArgumentException("The specified field '"+fieldExpression+"' is refering
to a composite type.\n"
-								+ "Either select all elements in this type with the '"+ExpressionKeys.SELECT_ALL_CHAR+"'
operator or specify a field in the sub-type");
-					}
-					result.add(new FlatFieldDescriptor(offset + fieldId, fields[i].type));
-					return;
+				// add all fields of composite type
+				((CompositeType) fieldType).getFlatFields("*", offset, result);
+				return;
+			} else {
+				// we found the field to add
+				// compute flat field position by adding skipped fields
+				int flatFieldPos = offset;
+				for(int i=0; i<fieldPos; i++) {
+					flatFieldPos += this.getTypeAt(i).getTotalFields();
 				}
-				fieldId++;
+				result.add(new FlatFieldDescriptor(flatFieldPos, fieldType));
+				// nothing left to do
+				return;
 			}
 		} else {
-			// split and go deeper
-			String firstField = fieldExpression.substring(0, firstDot);
-			String rest = fieldExpression.substring(firstDot + 1);
-			int fieldId = 0;
-			for (int i = 0; i < fields.length; i++) {
-				if (fields[i].field.getName().equals(firstField)) {
-					if (!(fields[i].type instanceof CompositeType<?>)) {
-						throw new RuntimeException("Field "+fields[i].type+" (specified by '"+fieldExpression+"')
is not a composite type");
-					}
-					CompositeType<?> cType = (CompositeType<?>) fields[i].type;
-					cType.getKey(rest, offset + fieldId, result); // recurse
-					return;
+			if(fieldType instanceof CompositeType<?>) {
+				// forward offset
+				for(int i=0; i<fieldPos; i++) {
+					offset += this.getTypeAt(i).getTotalFields();
 				}
-				fieldId += fields[i].type.getTotalFields();
+				((CompositeType) fieldType).getFlatFields(tail, offset, result);
+				// nothing left to do
+				return;
+			} else {
+				throw new InvalidFieldReferenceException("Nested field expression \""+tail+"\" not possible
on atomic type "+fieldType+".");
+			}
+		}
+	}
+
+	public TypeInformation<?> getTypeAt(String fieldExpression) {
+
+		Matcher matcher = PATTERN_NESTED_FIELDS.matcher(fieldExpression);
+		if(!matcher.matches()) {
+			if (fieldExpression.startsWith(ExpressionKeys.SELECT_ALL_CHAR) || fieldExpression.startsWith(ExpressionKeys.SELECT_ALL_CHAR_SCALA))
{
+				throw new InvalidFieldReferenceException("Wildcard expressions are not allowed here.");
+			} else {
+				throw new InvalidFieldReferenceException("Invalid format of POJO field expression \""+fieldExpression+"\".");
+			}
+		}
+
+		String field = matcher.group(1);
+		// get field
+		int fieldPos = -1;
+		TypeInformation<?> fieldType = null;
+		for (int i = 0; i < fields.length; i++) {
+			if (fields[i].field.getName().equals(field)) {
+				fieldPos = i;
+				fieldType = fields[i].type;
+				break;
+			}
+		}
+		if (fieldPos == -1) {
+			throw new InvalidFieldReferenceException("Unable to find field \""+field+"\" in type "+this+".");
+		}
+
+		String tail = matcher.group(3);
+		if(tail == null) {
+			// we found the type
+			return fieldType;
+		} else {
+			if(fieldType instanceof CompositeType<?>) {
+				return ((CompositeType) fieldType).getTypeAt(tail);
+			} else {
+				throw new InvalidFieldReferenceException("Nested field expression \""+tail+"\" not possible
on atomic type "+fieldType+".");
 			}
-			throw new RuntimeException("Unable to find field "+fieldExpression+" in type "+this+"
(looking for '"+firstField+"')");
 		}
 	}
 
@@ -248,4 +314,23 @@ public class PojoTypeInfo<T> extends CompositeType<T>{
 				+ ", fields = [" + Joiner.on(", ").join(fieldStrings) + "]"
 				+ ">";
 	}
+
+	public static class NamedFlatFieldDescriptor extends FlatFieldDescriptor {
+
+		private String fieldName;
+
+		public NamedFlatFieldDescriptor(String name, int keyPosition, TypeInformation<?>
type) {
+			super(keyPosition, type);
+			this.fieldName = name;
+		}
+
+		public String getFieldName() {
+			return fieldName;
+		}
+
+		@Override
+		public String toString() {
+			return "NamedFlatFieldDescriptor [name="+fieldName+" position="+getPosition()+" typeInfo="+getType()+"]";
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/de8e066c/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TupleTypeInfoBase.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TupleTypeInfoBase.java
b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TupleTypeInfoBase.java
index 22f7942..effec2b 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TupleTypeInfoBase.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TupleTypeInfoBase.java
@@ -20,23 +20,32 @@ package org.apache.flink.api.java.typeutils;
 
 import java.util.Arrays;
 import java.util.List;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
 
-import org.apache.commons.lang3.StringUtils;
 import org.apache.flink.api.common.typeinfo.AtomicType;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.CompositeType;
 import org.apache.flink.api.java.operators.Keys.ExpressionKeys;
 
-import com.google.common.base.Preconditions;
-
 public abstract class TupleTypeInfoBase<T> extends CompositeType<T> {
-	
+
+	private final static String REGEX_FIELD = "(f?)([0-9]+)";
+	private final static String REGEX_NESTED_FIELDS = "("+REGEX_FIELD+")(\\.(.+))?";
+	private final static String REGEX_NESTED_FIELDS_WILDCARD = REGEX_NESTED_FIELDS
+			+"|\\"+ExpressionKeys.SELECT_ALL_CHAR
+			+"|\\"+ExpressionKeys.SELECT_ALL_CHAR_SCALA;
+
+	private static final Pattern PATTERN_FIELD = Pattern.compile(REGEX_FIELD);
+	private static final Pattern PATTERN_NESTED_FIELDS = Pattern.compile(REGEX_NESTED_FIELDS);
+	private static final Pattern PATTERN_NESTED_FIELDS_WILDCARD = Pattern.compile(REGEX_NESTED_FIELDS_WILDCARD);
+
 	protected final TypeInformation<?>[] types;
 	
 	protected final Class<T> tupleType;
 
 	private int totalFields;
-	
+
 	public TupleTypeInfoBase(Class<T> tupleType, TypeInformation<?>... types) {
 		super(tupleType);
 		this.tupleType = tupleType;
@@ -90,82 +99,114 @@ public abstract class TupleTypeInfoBase<T> extends CompositeType<T>
{
 			startKeyId += type.getTotalFields();
 		}
 	}
-	
 
 	@Override
-	public void getKey(String fieldExpression, int offset, List<FlatFieldDescriptor> result)
{
-		// handle 'select all'
-		if(fieldExpression.equals(ExpressionKeys.SELECT_ALL_CHAR) || fieldExpression.equals(ExpressionKeys.SELECT_ALL_CHAR_SCALA))
{
+	public void getFlatFields(String fieldExpression, int offset, List<FlatFieldDescriptor>
result) {
+
+		Matcher matcher = PATTERN_NESTED_FIELDS_WILDCARD.matcher(fieldExpression);
+		if (!matcher.matches()) {
+			throw new InvalidFieldReferenceException("Invalid tuple field reference \""+fieldExpression+"\".");
+		}
+
+		String field = matcher.group(0);
+		if (field.equals(ExpressionKeys.SELECT_ALL_CHAR) || field.equals(ExpressionKeys.SELECT_ALL_CHAR_SCALA))
{
+			// handle select all
 			int keyPosition = 0;
-			for(TypeInformation<?> type : types) {
-				if(type instanceof AtomicType) {
-					result.add(new FlatFieldDescriptor(offset + keyPosition, type));
-				} else if(type instanceof CompositeType) {
-					CompositeType<?> cType = (CompositeType<?>)type;
-					cType.getKey(String.valueOf(ExpressionKeys.SELECT_ALL_CHAR), offset + keyPosition, result);
-					keyPosition += cType.getTotalFields()-1;
+			for (TypeInformation<?> type : types) {
+				if (type instanceof CompositeType) {
+					CompositeType<?> cType = (CompositeType<?>) type;
+					cType.getFlatFields(String.valueOf(ExpressionKeys.SELECT_ALL_CHAR), offset + keyPosition,
result);
+					keyPosition += cType.getTotalFields() - 1;
 				} else {
-					throw new RuntimeException("Unexpected key type: "+type);
+					result.add(new FlatFieldDescriptor(offset + keyPosition, type));
 				}
 				keyPosition++;
 			}
 			return;
 		}
-		// check input
-		if(fieldExpression.length() < 2) {
-			throw new IllegalArgumentException("The field expression '"+fieldExpression+"' is incorrect.
The length must be at least 2");
-		}
-		if(fieldExpression.charAt(0) != 'f') {
-			throw new IllegalArgumentException("The field expression '"+fieldExpression+"' is incorrect
for a Tuple type. It has to start with an 'f'");
-		}
-		// get first component of nested expression
-		int dotPos = fieldExpression.indexOf('.');
-		String nestedSplitFirst = fieldExpression;
-		if(dotPos != -1 ) {
-			Preconditions.checkArgument(dotPos != fieldExpression.length()-1, "The field expression
can never end with a dot.");
-			nestedSplitFirst = fieldExpression.substring(0, dotPos);
-		}
-		String fieldNumStr = nestedSplitFirst.substring(1, nestedSplitFirst.length());
-		if(!StringUtils.isNumeric(fieldNumStr)) {
-			throw new IllegalArgumentException("The field expression '"+fieldExpression+"' is incorrect.
Field number '"+fieldNumStr+" is not numeric");
-		}
-		int pos = -1;
-		try {
-			pos = Integer.valueOf(fieldNumStr);
-		} catch(NumberFormatException nfe) {
-			throw new IllegalArgumentException("The field expression '"+fieldExpression+"' is incorrect.
Field number '"+fieldNumStr+" is not numeric", nfe);
+
+		String fieldStr = matcher.group(1);
+		Matcher fieldMatcher = PATTERN_FIELD.matcher(fieldStr);
+		if (!fieldMatcher.matches()) {
+			throw new RuntimeException("Invalid matcher pattern");
 		}
-		if(pos < 0) {
-			throw new IllegalArgumentException("Negative position is not possible");
+		field = fieldMatcher.group(2);
+		int fieldPos = Integer.valueOf(field);
+
+		if (fieldPos >= this.getArity()) {
+			throw new InvalidFieldReferenceException("Tuple field expression \"" + fieldStr + "\"
out of bounds of " + this.toString() + ".");
 		}
-		// pass down the remainder (after the dot) of the fieldExpression to the type at that position.
-		if(dotPos != -1) { // we need to go deeper
-			String rem = fieldExpression.substring(dotPos+1);
-			if( !(types[pos] instanceof CompositeType<?>) ) {
-				throw new RuntimeException("Element at position "+pos+" is not a composite type. There
are no nested types to select");
+		TypeInformation<?> fieldType = this.getTypeAt(fieldPos);
+		String tail = matcher.group(5);
+		if(tail == null) {
+			if(fieldType instanceof CompositeType) {
+				// forward offsets
+				for(int i=0; i<fieldPos; i++) {
+					offset += this.getTypeAt(i).getTotalFields();
+				}
+				// add all fields of composite type
+				((CompositeType) fieldType).getFlatFields("*", offset, result);
+				return;
+			} else {
+				// we found the field to add
+				// compute flat field position by adding skipped fields
+				int flatFieldPos = offset;
+				for(int i=0; i<fieldPos; i++) {
+					flatFieldPos += this.getTypeAt(i).getTotalFields();
+				}
+				result.add(new FlatFieldDescriptor(flatFieldPos, fieldType));
+				// nothing left to do
+				return;
 			}
-			CompositeType<?> cType = (CompositeType<?>) types[pos];
-			// count nested fields before "pos"
-			for (int i = 0; i < pos; i++) {
-				offset += types[i].getTotalFields();
+		} else {
+			if(fieldType instanceof CompositeType<?>) {
+				// forward offset
+				for(int i=0; i<fieldPos; i++) {
+					offset += this.getTypeAt(i).getTotalFields();
+				}
+				((CompositeType) fieldType).getFlatFields(tail, offset, result);
+				// nothing left to do
+				return;
+			} else {
+				throw new InvalidFieldReferenceException("Nested field expression \""+tail+"\" not possible
on atomic type "+fieldType+".");
 			}
-			cType.getKey(rem, offset, result);
-			return;
 		}
-		
-		if(pos >= types.length) {
-			throw new IllegalArgumentException("The specified tuple position does not exist");
+	}
+
+	public <X> TypeInformation<X> getTypeAt(String fieldExpression) {
+
+		Matcher matcher = PATTERN_NESTED_FIELDS.matcher(fieldExpression);
+		if(!matcher.matches()) {
+			if (fieldExpression.equals(ExpressionKeys.SELECT_ALL_CHAR) || fieldExpression.equals(ExpressionKeys.SELECT_ALL_CHAR_SCALA))
{
+				throw new InvalidFieldReferenceException("Wildcard expressions are not allowed here.");
+			} else {
+				throw new InvalidFieldReferenceException("Invalid format of tuple field expression \""+fieldExpression+"\".");
+			}
 		}
-		
-		// count nested fields before "pos".
-		for(int i = 0; i < pos; i++) {
-			offset += types[i].getTotalFields() - 1; // this adds only something to offset if its
a composite type.
+
+		String fieldStr = matcher.group(1);
+		Matcher fieldMatcher = PATTERN_FIELD.matcher(fieldStr);
+		if(!fieldMatcher.matches()) {
+			throw new RuntimeException("Invalid matcher pattern");
 		}
-		if(types[pos] instanceof CompositeType) {
-			throw new IllegalArgumentException("The specified field '"+fieldExpression+"' is refering
to a composite type.\n"
-					+ "Either select all elements in this type with the '"+ExpressionKeys.SELECT_ALL_CHAR+"'
operator or specify a field in the sub-type");
+		String field = fieldMatcher.group(2);
+		int fieldPos = Integer.valueOf(field);
+
+		if(fieldPos >= this.getArity()) {
+			throw new InvalidFieldReferenceException("Tuple field expression \""+fieldStr+"\" out
of bounds of "+this.toString()+".");
+		}
+		TypeInformation<X> fieldType = this.getTypeAt(fieldPos);
+		String tail = matcher.group(5);
+		if(tail == null) {
+			// we found the type
+			return fieldType;
+		} else {
+			if(fieldType instanceof CompositeType<?>) {
+				return ((CompositeType) fieldType).getTypeAt(tail);
+			} else {
+				throw new InvalidFieldReferenceException("Nested field expression \""+tail+"\" not possible
on atomic type "+fieldType+".");
+			}
 		}
-		result.add(new FlatFieldDescriptor(offset + pos, types[pos]));
 	}
 	
 	public <X> TypeInformation<X> getTypeAt(int pos) {


Mime
View raw message