flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From fhue...@apache.org
Subject [01/13] flink git commit: [FLINK-1328] Integrated forwarded Fields into the optimizer (incomplete)
Date Wed, 28 Jan 2015 01:24:01 GMT
Repository: flink
Updated Branches:
  refs/heads/master 82c420022 -> 805ea6943


http://git-wip-us.apache.org/repos/asf/flink/blob/78f41e9c/flink-core/src/main/java/org/apache/flink/api/common/operators/Ordering.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/Ordering.java
b/flink-core/src/main/java/org/apache/flink/api/common/operators/Ordering.java
index e99cac7..7b90c8e 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/operators/Ordering.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/Ordering.java
@@ -144,7 +144,7 @@ public class Ordering {
 		}
 		
 		for (int i = 0; i < this.indexes.size(); i++) {
-			if (this.indexes.get(i) != otherOrdering.indexes.get(i)) {
+			if (this.indexes.get(i).intValue() != otherOrdering.indexes.get(i).intValue()) {
 				return false;
 			}
 			

http://git-wip-us.apache.org/repos/asf/flink/blob/78f41e9c/flink-core/src/main/java/org/apache/flink/api/common/operators/SemanticProperties.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/SemanticProperties.java
b/flink-core/src/main/java/org/apache/flink/api/common/operators/SemanticProperties.java
index ba801ec..da99018 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/operators/SemanticProperties.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/SemanticProperties.java
@@ -27,7 +27,8 @@ import org.apache.flink.api.common.operators.util.FieldSet;
  * Container for the semantic properties associated to an operator.
  */
 public abstract class SemanticProperties implements Serializable {
-	
+	private boolean allFieldsConstant;
+
 	private static final long serialVersionUID = 1L;
 
 	/** Set of fields that are written in the destination record(s).*/
@@ -47,7 +48,19 @@ public abstract class SemanticProperties implements Serializable {
 			this.writtenFields = this.writtenFields.addFields(writtenFields);
 		}
 	}
-	
+
+	public void setAllFieldsConstant(boolean constant) {
+		this.allFieldsConstant = constant;
+	}
+
+	public boolean isAllFieldsConstant() {
+		return this.allFieldsConstant;
+	}
+
+	public abstract FieldSet getForwardFields(int input, int field);
+
+	public abstract FieldSet getSourceField(int input, int field);
+
 	/**
 	 * Sets the field(s) that are written in the destination record(s).
 	 * 

http://git-wip-us.apache.org/repos/asf/flink/blob/78f41e9c/flink-core/src/main/java/org/apache/flink/api/common/operators/SingleInputSemanticProperties.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/SingleInputSemanticProperties.java
b/flink-core/src/main/java/org/apache/flink/api/common/operators/SingleInputSemanticProperties.java
index 77ed1bc..abe995b 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/operators/SingleInputSemanticProperties.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/SingleInputSemanticProperties.java
@@ -36,7 +36,41 @@ public class SingleInputSemanticProperties extends SemanticProperties {
 	/** Set of fields that are read in the source record(s).*/
 	private FieldSet readFields;
 
-	
+	@Override
+	public FieldSet getForwardFields(int input, int field) {
+		if (input != 0) {
+			throw new IndexOutOfBoundsException();
+		}
+		return this.getForwardedField(field);
+	}
+
+	@Override
+	public FieldSet getSourceField(int input, int field) {
+		if (input != 0) {
+			throw new IndexOutOfBoundsException();
+		}
+
+		if (isAllFieldsConstant()) {
+			return new FieldSet(field);
+		}
+
+		return this.forwardedFrom(field);
+	}
+
+	public FieldSet forwardedFrom(int dest) {
+		FieldSet fs = null;
+		for (Map.Entry<Integer, FieldSet> entry : forwardedFields.entrySet()) {
+			if (entry.getValue().contains(dest)) {
+				if (fs == null) {
+					fs = new FieldSet();
+				}
+
+				fs = fs.addField(entry.getKey());
+			}
+		}
+		return fs;
+	}
+
 	public SingleInputSemanticProperties() {
 		init();
 	}
@@ -95,6 +129,10 @@ public class SingleInputSemanticProperties extends SemanticProperties
{
 	 * @return the destination fields, or null if they do not exist
 	 */
 	public FieldSet getForwardedField(int sourceField) {
+		if (isAllFieldsConstant()) {
+			return new FieldSet(sourceField);
+		}
+
 		return this.forwardedFields.get(sourceField);
 	}
 	
@@ -145,7 +183,12 @@ public class SingleInputSemanticProperties extends SemanticProperties
{
 				(forwardedFields == null || forwardedFields.isEmpty()) &&
 				(readFields == null || readFields.size() == 0);
 	}
-	
+
+	@Override
+	public String toString() {
+		return "SISP(" + this.forwardedFields + ")";
+	}
+
 	private void init() {
 		this.forwardedFields = new HashMap<Integer,FieldSet>();
 		this.readFields = null;

http://git-wip-us.apache.org/repos/asf/flink/blob/78f41e9c/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/graph/ConnectedComponents.java
----------------------------------------------------------------------
diff --git a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/graph/ConnectedComponents.java
b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/graph/ConnectedComponents.java
index cb99e23..cb3b8d5 100644
--- a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/graph/ConnectedComponents.java
+++ b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/graph/ConnectedComponents.java
@@ -155,8 +155,8 @@ public class ConnectedComponents implements ProgramDescription {
 	 * a vertex is associated with, with a (Source-Vertex-ID, Target-VertexID) edge. The function
 	 * produces a (Target-vertex-ID, Component-ID) pair.
 	 */
-	@ConstantFieldsFirst("1 -> 0")
-	@ConstantFieldsSecond("1 -> 1")
+	@ConstantFieldsFirst("1 -> 1")
+	@ConstantFieldsSecond("1 -> 0")
 	public static final class NeighborWithComponentIDJoin implements JoinFunction<Tuple2<Long,
Long>, Tuple2<Long, Long>, Tuple2<Long, Long>> {
 
 		@Override
@@ -178,8 +178,6 @@ public class ConnectedComponents implements ProgramDescription {
 		}
 	}
 
-
-
 	@Override
 	public String getDescription() {
 		return "Parameters: <vertices-path> <edges-path> <result-path> <max-number-of-iterations>";

http://git-wip-us.apache.org/repos/asf/flink/blob/78f41e9c/flink-java/src/main/java/org/apache/flink/api/java/record/functions/FunctionAnnotation.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/record/functions/FunctionAnnotation.java
b/flink-java/src/main/java/org/apache/flink/api/java/record/functions/FunctionAnnotation.java
index 069678a..1e73b92 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/record/functions/FunctionAnnotation.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/record/functions/FunctionAnnotation.java
@@ -341,19 +341,19 @@ public class FunctionAnnotation {
 		
 		return semanticProperties;
 	}
-	
-	
+
+
 	private static final class ImplicitlyForwardingSingleInputSemanticProperties extends SingleInputSemanticProperties
{
 		private static final long serialVersionUID = 1L;
-		
+
 		private FieldSet nonForwardedFields;
-		
+
 		private ImplicitlyForwardingSingleInputSemanticProperties(FieldSet nonForwardedFields)
{
 			this.nonForwardedFields = nonForwardedFields;
 			addWrittenFields(nonForwardedFields);
 		}
-		
-		
+
+
 		/**
 		 * Returns the logical position where the given field is written to.
 		 * In this variant of the semantic properties, all fields are assumed implicitly forwarded,
@@ -362,52 +362,81 @@ public class FunctionAnnotation {
 		 */
 		@Override
 		public FieldSet getForwardedField(int sourceField) {
+			if (isAllFieldsConstant()) {
+				return new FieldSet(sourceField);
+			}
+
 			if (this.nonForwardedFields.contains(sourceField)) {
 				return null;
 			} else {
 				return new FieldSet(sourceField);
 			}
 		}
-		
+
+		@Override
+		public FieldSet getSourceField(int input, int field) {
+			if (input != 0) {
+				throw new IndexOutOfBoundsException();
+			}
+
+			if (isAllFieldsConstant()) {
+				return new FieldSet(field);
+			}
+
+			if (this.nonForwardedFields == null) {
+				return super.getSourceField(input, field);
+			}
+
+			if (this.nonForwardedFields.contains(field)) {
+				return null;
+			} else {
+				return new FieldSet(field);
+			}
+		}
+
 		@Override
 		public void addForwardedField(int sourceField, int destinationField) {
 			throw new UnsupportedOperationException("When defining fields as implicitly constant "
+
 					"(such as through the ConstantFieldsExcept annotation), you cannot manually add forwarded
fields.");
 		}
-		
+
 		@Override
 		public void addForwardedField(int sourceField, FieldSet destinationFields) {
 			throw new UnsupportedOperationException("When defining fields as implicitly constant "
+
 					"(such as through the ConstantFieldsExcept annotation), you cannot manually add forwarded
fields.");
 		}
-		
+
 		@Override
 		public void setForwardedField(int sourceField, FieldSet destinationFields) {
 			throw new UnsupportedOperationException("When defining fields as implicitly constant "
+
 					"(such as through the ConstantFieldsExcept annotation), you cannot manually add forwarded
fields.");
 		}
 	}
-	
+
 	private static final class ImplicitlyForwardingTwoInputSemanticProperties extends DualInputSemanticProperties
{
 		private static final long serialVersionUID = 1L;
-		
+
 		private FieldSet nonForwardedFields1;
 		private FieldSet nonForwardedFields2;
-		
+
 		private ImplicitlyForwardingTwoInputSemanticProperties() {}
-		
-		
+
+
 		public void setImplicitlyForwardingFirstExcept(FieldSet nonForwardedFields) {
 			this.nonForwardedFields1 = nonForwardedFields;
 		}
-		
+
 		public void setImplicitlyForwardingSecondExcept(FieldSet nonForwardedFields) {
 			this.nonForwardedFields2 = nonForwardedFields;
 		}
-		
+
 
 		@Override
 		public FieldSet getForwardedField1(int sourceField) {
+			if (isAllFieldsConstant()) {
+				return new FieldSet(sourceField);
+			}
+
 			if (this.nonForwardedFields1 == null) {
 				return super.getForwardedField1(sourceField);
 			}
@@ -419,9 +448,13 @@ public class FunctionAnnotation {
 				}
 			}
 		}
-		
+
 		@Override
 		public FieldSet getForwardedField2(int sourceField) {
+			if (isAllFieldsConstant()) {
+				return new FieldSet(sourceField);
+			}
+
 			if (this.nonForwardedFields2 == null) {
 				return super.getForwardedField2(sourceField);
 			}
@@ -433,7 +466,36 @@ public class FunctionAnnotation {
 				}
 			}
 		}
-		
+
+		@Override
+		public FieldSet getSourceField(int input, int field) {
+			if (input != 0 && input != 1) {
+				throw new IndexOutOfBoundsException();
+			}
+
+			if (isAllFieldsConstant()) {
+				return new FieldSet(field);
+			}
+
+			if (this.nonForwardedFields1 == null && this.nonForwardedFields2 == null) {
+				return super.getSourceField(input, field);
+			}
+
+			if (input == 0 && this.nonForwardedFields1 != null && this.nonForwardedFields1.contains(field))
{
+				return null;
+			} else if (input == 0) {
+				return new FieldSet(field);
+			}
+
+			if (input == 1 && this.nonForwardedFields2 != null && this.nonForwardedFields2.contains(field))
{
+				return null;
+			} else if (input == 1) {
+				return new FieldSet(field);
+			}
+
+			return null;
+		}
+
 		@Override
 		public void addForwardedField1(int sourceField, int destinationField) {
 			if (this.nonForwardedFields1 == null) {
@@ -444,7 +506,7 @@ public class FunctionAnnotation {
 						"(such as through the ConstantFieldsFirstExcept annotation), you cannot manually add
forwarded fields.");
 			}
 		}
-		
+
 		@Override
 		public void addForwardedField1(int sourceField, FieldSet destinationFields) {
 			if (this.nonForwardedFields1 == null) {
@@ -455,7 +517,7 @@ public class FunctionAnnotation {
 						"(such as through the ConstantFieldsFirstExcept annotation), you cannot manually add
forwarded fields.");
 			}
 		}
-		
+
 		@Override
 		public void setForwardedField1(int sourceField, FieldSet destinationFields) {
 			if (this.nonForwardedFields1 == null) {
@@ -466,7 +528,7 @@ public class FunctionAnnotation {
 						"(such as through the ConstantFieldsFirstExcept annotation), you cannot manually add
forwarded fields.");
 			}
 		}
-		
+
 		@Override
 		public void addForwardedField2(int sourceField, int destinationField) {
 			if (this.nonForwardedFields2 == null) {
@@ -477,7 +539,7 @@ public class FunctionAnnotation {
 						"(such as through the ConstantFieldsSecondExcept annotation), you cannot manually add
forwarded fields.");
 			}
 		}
-		
+
 		@Override
 		public void addForwardedField2(int sourceField, FieldSet destinationFields) {
 			if (this.nonForwardedFields2 == null) {
@@ -488,7 +550,7 @@ public class FunctionAnnotation {
 						"(such as through the ConstantFieldsSecondExcept annotation), you cannot manually add
forwarded fields.");
 			}
 		}
-		
+
 		@Override
 		public void setForwardedField2(int sourceField, FieldSet destinationFields) {
 			if (this.nonForwardedFields2 == null) {


Mime
View raw message