flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From fhue...@apache.org
Subject [11/16] flink git commit: [FLINK-2901] Remove Record API dependencies from flink-tests #2
Date Tue, 24 Nov 2015 17:17:54 GMT
http://git-wip-us.apache.org/repos/asf/flink/blob/ebba20df/flink-tests/src/test/java/org/apache/flink/test/recordJobs/kmeans/udfs/PointInFormat.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recordJobs/kmeans/udfs/PointInFormat.java b/flink-tests/src/test/java/org/apache/flink/test/recordJobs/kmeans/udfs/PointInFormat.java
deleted file mode 100644
index c5dd8ec..0000000
--- a/flink-tests/src/test/java/org/apache/flink/test/recordJobs/kmeans/udfs/PointInFormat.java
+++ /dev/null
@@ -1,107 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.recordJobs.kmeans.udfs;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.flink.api.java.record.io.DelimitedInputFormat;
-import org.apache.flink.types.IntValue;
-import org.apache.flink.types.Record;
-
-/**
- * Generates records with an id and a and CoordVector.
- * The input format is line-based, i.e. one record is read from one line
- * which is terminated by '\n'. Within a line the first '|' character separates
- * the id from the CoordVector. The vector consists of a vector of decimals.
- * The decimals are separated by '|' as well. The id is the id of a data point or
- * cluster center and the CoordVector the corresponding position (coordinate
- * vector) of the data point or cluster center. Example line:
- * "42|23.23|52.57|74.43| Id: 42 Coordinate vector: (23.23, 52.57, 74.43)
- */
-public class PointInFormat extends DelimitedInputFormat {
-	private static final long serialVersionUID = 1L;
-	
-	private final IntValue idInteger = new IntValue();
-	private final CoordVector point = new CoordVector();
-	
-	private final List<Double> dimensionValues = new ArrayList<Double>();
-	private double[] pointValues = new double[0];
-	
-	@Override
-	public Record readRecord(Record record, byte[] line, int offset, int numBytes) {
-		
-		final int limit = offset + numBytes;
-		
-		int id = -1;
-		int value = 0;
-		int fractionValue = 0;
-		int fractionChars = 0;
-		boolean negative = false;
-		
-		this.dimensionValues.clear();
-
-		for (int pos = offset; pos < limit; pos++) {
-			if (line[pos] == '|') {
-				// check if id was already set
-				if (id == -1) {
-					id = value;
-				}
-				else {
-					double v = value + ((double) fractionValue) * Math.pow(10, (-1 * (fractionChars - 1)));
-					this.dimensionValues.add(negative ? -v : v);
-				}
-				// reset value
-				value = 0;
-				fractionValue = 0;
-				fractionChars = 0;
-				negative = false;
-			} else if (line[pos] == '.') {
-				fractionChars = 1;
-			} else if (line[pos] == '-') {
-				negative = true;
-			} else {
-				if (fractionChars == 0) {
-					value *= 10;
-					value += line[pos] - '0';
-				} else {
-					fractionValue *= 10;
-					fractionValue += line[pos] - '0';
-					fractionChars++;
-				}
-			}
-		}
-
-		// set the ID
-		this.idInteger.setValue(id);
-		record.setField(0, this.idInteger);
-		
-		// set the data points
-		if (this.pointValues.length != this.dimensionValues.size()) {
-			this.pointValues = new double[this.dimensionValues.size()];
-		}
-		for (int i = 0; i < this.pointValues.length; i++) {
-			this.pointValues[i] = this.dimensionValues.get(i);
-		}
-		
-		this.point.setCoordinates(this.pointValues);
-		record.setField(1, this.point);
-		return record;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/ebba20df/flink-tests/src/test/java/org/apache/flink/test/recordJobs/kmeans/udfs/PointOutFormat.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recordJobs/kmeans/udfs/PointOutFormat.java b/flink-tests/src/test/java/org/apache/flink/test/recordJobs/kmeans/udfs/PointOutFormat.java
deleted file mode 100644
index 410397e..0000000
--- a/flink-tests/src/test/java/org/apache/flink/test/recordJobs/kmeans/udfs/PointOutFormat.java
+++ /dev/null
@@ -1,78 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.recordJobs.kmeans.udfs;
-
-import java.text.DecimalFormat;
-import java.text.DecimalFormatSymbols;
-
-import org.apache.flink.api.java.record.io.DelimitedOutputFormat;
-import org.apache.flink.types.IntValue;
-import org.apache.flink.types.Record;
-
-/**
- * Writes records that contain an id and a CoordVector.
- * The output format is line-based, i.e. one record is written to
- * a line and terminated with '\n'. Within a line the first '|' character
- * separates the id from the CoordVector. The vector consists of a vector of
- * decimals. The decimals are separated by '|'. The is is the id of a data
- * point or cluster center and the vector the corresponding position
- * (coordinate vector) of the data point or cluster center. Example line:
- * "42|23.23|52.57|74.43| Id: 42 Coordinate vector: (23.23, 52.57, 74.43)
- */
-public class PointOutFormat extends DelimitedOutputFormat {
-	private static final long serialVersionUID = 1L;
-	
-	private final DecimalFormat df = new DecimalFormat("####0.00");
-	private final StringBuilder line = new StringBuilder();
-	
-	
-	public PointOutFormat() {
-		DecimalFormatSymbols dfSymbols = new DecimalFormatSymbols();
-		dfSymbols.setDecimalSeparator('.');
-		this.df.setDecimalFormatSymbols(dfSymbols);
-	}
-	
-	@Override
-	public int serializeRecord(Record record, byte[] target) {
-		
-		line.setLength(0);
-		
-		IntValue centerId = record.getField(0, IntValue.class);
-		CoordVector centerPos = record.getField(1, CoordVector.class);
-		
-		
-		line.append(centerId.getValue());
-
-		for (double coord : centerPos.getCoordinates()) {
-			line.append('|');
-			line.append(df.format(coord));
-		}
-		line.append('|');
-		
-		byte[] byteString = line.toString().getBytes();
-		
-		if (byteString.length <= target.length) {
-			System.arraycopy(byteString, 0, target, 0, byteString.length);
-			return byteString.length;
-		}
-		else {
-			return -byteString.length;
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/ebba20df/flink-tests/src/test/java/org/apache/flink/test/recordJobs/kmeans/udfs/RecomputeClusterCenter.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recordJobs/kmeans/udfs/RecomputeClusterCenter.java b/flink-tests/src/test/java/org/apache/flink/test/recordJobs/kmeans/udfs/RecomputeClusterCenter.java
deleted file mode 100644
index 89e222b..0000000
--- a/flink-tests/src/test/java/org/apache/flink/test/recordJobs/kmeans/udfs/RecomputeClusterCenter.java
+++ /dev/null
@@ -1,158 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.recordJobs.kmeans.udfs;
-
-import java.io.Serializable;
-import java.util.Iterator;
-
-import org.apache.flink.api.java.record.functions.ReduceFunction;
-import org.apache.flink.api.java.record.functions.FunctionAnnotation.ConstantFields;
-import org.apache.flink.api.java.record.operators.ReduceOperator.Combinable;
-import org.apache.flink.types.IntValue;
-import org.apache.flink.types.Record;
-import org.apache.flink.util.Collector;
-
-/**
- * Reduce PACT computes the new position (coordinate vector) of a cluster
- * center. This is an average computation. Hence, Combinable is annotated
- * and the combine method implemented. 
- * 
- * Output Format:
- * 0: clusterID
- * 1: clusterVector
- */
-@SuppressWarnings("deprecation")
-@Combinable
-@ConstantFields(0)
-public class RecomputeClusterCenter extends ReduceFunction implements Serializable {
-	private static final long serialVersionUID = 1L;
-	
-	private final IntValue count = new IntValue();
-	
-	/**
-	 * Compute the new position (coordinate vector) of a cluster center.
-	 */
-	@Override
-	public void reduce(Iterator<Record> dataPoints, Collector<Record> out) {
-		Record next = null;
-			
-		// initialize coordinate vector sum and count
-		CoordVector coordinates = new CoordVector();
-		double[] coordinateSum = null;
-		int count = 0;	
-
-		// compute coordinate vector sum and count
-		while (dataPoints.hasNext()) {
-			next = dataPoints.next();
-			
-			// get the coordinates and the count from the record
-			double[] thisCoords = next.getField(1, CoordVector.class).getCoordinates();
-			int thisCount = next.getField(2, IntValue.class).getValue();
-			
-			if (coordinateSum == null) {
-				if (coordinates.getCoordinates() != null) {
-					coordinateSum = coordinates.getCoordinates();
-				}
-				else {
-					coordinateSum = new double[thisCoords.length];
-				}
-			}
-
-			addToCoordVector(coordinateSum, thisCoords);
-			count += thisCount;
-		}
-
-		// compute new coordinate vector (position) of cluster center
-		for (int i = 0; i < coordinateSum.length; i++) {
-			coordinateSum[i] /= count;
-		}
-		
-		coordinates.setCoordinates(coordinateSum);
-		next.setField(1, coordinates);
-		next.setNull(2);
-
-		// emit new position of cluster center
-		out.collect(next);
-	}
-
-	/**
-	 * Computes a pre-aggregated average value of a coordinate vector.
-	 */
-	@Override
-	public void combine(Iterator<Record> dataPoints, Collector<Record> out) {
-		
-		Record next = null;
-		
-		// initialize coordinate vector sum and count
-		CoordVector coordinates = new CoordVector();
-		double[] coordinateSum = null;
-		int count = 0;	
-
-		// compute coordinate vector sum and count
-		while (dataPoints.hasNext()) {
-			next = dataPoints.next();
-			
-			// get the coordinates and the count from the record
-			double[] thisCoords = next.getField(1, CoordVector.class).getCoordinates();
-			int thisCount = next.getField(2, IntValue.class).getValue();
-			
-			if (coordinateSum == null) {
-				if (coordinates.getCoordinates() != null) {
-					coordinateSum = coordinates.getCoordinates();
-				}
-				else {
-					coordinateSum = new double[thisCoords.length];
-				}
-			}
-
-			addToCoordVector(coordinateSum, thisCoords);
-			count += thisCount;
-		}
-		
-		coordinates.setCoordinates(coordinateSum);
-		this.count.setValue(count);
-		next.setField(1, coordinates);
-		next.setField(2, this.count);
-		
-		// emit partial sum and partial count for average computation
-		out.collect(next);
-	}
-
-	/**
-	 * Adds two coordinate vectors by summing up each of their coordinates.
-	 * 
-	 * @param cvToAddTo
-	 *        The coordinate vector to which the other vector is added.
-	 *        This vector is returned.
-	 * @param cvToBeAdded
-	 *        The coordinate vector which is added to the other vector.
-	 *        This vector is not modified.
-	 */
-	private void addToCoordVector(double[] cvToAddTo, double[] cvToBeAdded) {
-		// check if both vectors have same length
-		if (cvToAddTo.length != cvToBeAdded.length) {
-			throw new IllegalArgumentException("The given coordinate vectors are not of equal length.");
-		}
-
-		// sum coordinate vectors coordinate-wise
-		for (int i = 0; i < cvToAddTo.length; i++) {
-			cvToAddTo[i] += cvToBeAdded[i];
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/ebba20df/flink-tests/src/test/java/org/apache/flink/test/recordJobs/relational/MergeOnlyJoin.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recordJobs/relational/MergeOnlyJoin.java b/flink-tests/src/test/java/org/apache/flink/test/recordJobs/relational/MergeOnlyJoin.java
deleted file mode 100644
index b948804..0000000
--- a/flink-tests/src/test/java/org/apache/flink/test/recordJobs/relational/MergeOnlyJoin.java
+++ /dev/null
@@ -1,122 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.recordJobs.relational;
-
-import java.util.Iterator;
-
-import org.apache.flink.api.common.Plan;
-import org.apache.flink.api.common.Program;
-import org.apache.flink.api.java.record.functions.JoinFunction;
-import org.apache.flink.api.java.record.functions.ReduceFunction;
-import org.apache.flink.api.java.record.functions.FunctionAnnotation.ConstantFieldsExcept;
-import org.apache.flink.api.java.record.functions.FunctionAnnotation.ConstantFieldsFirstExcept;
-import org.apache.flink.api.java.record.io.CsvInputFormat;
-import org.apache.flink.api.java.record.io.CsvOutputFormat;
-import org.apache.flink.api.java.record.operators.FileDataSink;
-import org.apache.flink.api.java.record.operators.FileDataSource;
-import org.apache.flink.api.java.record.operators.JoinOperator;
-import org.apache.flink.api.java.record.operators.ReduceOperator;
-import org.apache.flink.types.IntValue;
-import org.apache.flink.types.Record;
-import org.apache.flink.util.Collector;
-
-@SuppressWarnings("deprecation")
-public class MergeOnlyJoin implements Program {
-
-	private static final long serialVersionUID = 1L;
-
-	@ConstantFieldsFirstExcept(2)
-	public static class JoinInputs extends JoinFunction {
-		private static final long serialVersionUID = 1L;
-
-		@Override
-		public void join(Record input1, Record input2, Collector<Record> out) {
-			input1.setField(2, input2.getField(1, IntValue.class));
-			out.collect(input1);
-		}
-	}
-
-	@ConstantFieldsExcept({})
-	public static class DummyReduce extends ReduceFunction {
-		private static final long serialVersionUID = 1L;
-
-		@Override
-		public void reduce(Iterator<Record> values, Collector<Record> out) {
-			while (values.hasNext()) {
-				out.collect(values.next());
-			}
-		}
-	}
-
-
-	@Override
-	public Plan getPlan(final String... args) {
-		// parse program parameters
-		int numSubtasks       = (args.length > 0 ? Integer.parseInt(args[0]) : 1);
-		String input1Path    = (args.length > 1 ? args[1] : "");
-		String input2Path    = (args.length > 2 ? args[2] : "");
-		String output        = (args.length > 3 ? args[3] : "");
-		int numSubtasksInput2 = (args.length > 4 ? Integer.parseInt(args[4]) : 1);
-
-		// create DataSourceContract for Orders input
-		@SuppressWarnings("unchecked")
-		CsvInputFormat format1 = new CsvInputFormat('|', IntValue.class, IntValue.class);
-		FileDataSource input1 = new FileDataSource(format1, input1Path, "Input 1");
-		
-		ReduceOperator aggInput1 = ReduceOperator.builder(DummyReduce.class, IntValue.class, 0)
-			.input(input1)
-			.name("AggOrders")
-			.build();
-
-		
-		// create DataSourceContract for Orders input
-		@SuppressWarnings("unchecked")
-		CsvInputFormat format2 = new CsvInputFormat('|', IntValue.class, IntValue.class);
-		FileDataSource input2 = new FileDataSource(format2, input2Path, "Input 2");
-		input2.setParallelism(numSubtasksInput2);
-
-		ReduceOperator aggInput2 = ReduceOperator.builder(DummyReduce.class, IntValue.class, 0)
-			.input(input2)
-			.name("AggLines")
-			.build();
-		aggInput2.setParallelism(numSubtasksInput2);
-		
-		// create JoinOperator for joining Orders and LineItems
-		JoinOperator joinLiO = JoinOperator.builder(JoinInputs.class, IntValue.class, 0, 0)
-			.input1(aggInput1)
-			.input2(aggInput2)
-			.name("JoinLiO")
-			.build();
-
-		// create DataSinkContract for writing the result
-		FileDataSink result = new FileDataSink(new CsvOutputFormat(), output, joinLiO, "Output");
-		CsvOutputFormat.configureRecordFormat(result)
-			.recordDelimiter('\n')
-			.fieldDelimiter('|')
-			.lenient(true)
-			.field(IntValue.class, 0)
-			.field(IntValue.class, 1)
-			.field(IntValue.class, 2);
-		
-		// assemble the PACT plan
-		Plan plan = new Plan(result, "Merge Only Join");
-		plan.setDefaultParallelism(numSubtasks);
-		return plan;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/ebba20df/flink-tests/src/test/java/org/apache/flink/test/recordJobs/relational/TPCHQuery1.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recordJobs/relational/TPCHQuery1.java b/flink-tests/src/test/java/org/apache/flink/test/recordJobs/relational/TPCHQuery1.java
deleted file mode 100644
index d805b92..0000000
--- a/flink-tests/src/test/java/org/apache/flink/test/recordJobs/relational/TPCHQuery1.java
+++ /dev/null
@@ -1,87 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.recordJobs.relational;
-
-import org.apache.flink.api.common.Plan;
-import org.apache.flink.api.common.Program;
-import org.apache.flink.api.common.ProgramDescription;
-import org.apache.flink.api.java.record.operators.FileDataSink;
-import org.apache.flink.api.java.record.operators.FileDataSource;
-import org.apache.flink.api.java.record.operators.MapOperator;
-import org.apache.flink.api.java.record.operators.ReduceOperator;
-import org.apache.flink.test.recordJobs.relational.query1Util.GroupByReturnFlag;
-import org.apache.flink.test.recordJobs.relational.query1Util.LineItemFilter;
-import org.apache.flink.test.recordJobs.util.IntTupleDataInFormat;
-import org.apache.flink.test.recordJobs.util.StringTupleDataOutFormat;
-import org.apache.flink.types.StringValue;
-
-@SuppressWarnings("deprecation")
-public class TPCHQuery1 implements Program, ProgramDescription {
-
-	private static final long serialVersionUID = 1L;
-
-	private int parallelism = 1;
-	private String lineItemInputPath;
-	private String outputPath;
-	
-	@Override
-	public Plan getPlan(String... args) throws IllegalArgumentException {
-		
-		
-		if (args.length != 3) {
-			this.parallelism = 1;
-			this.lineItemInputPath = "";
-			this.outputPath = "";
-		} else {
-			this.parallelism = Integer.parseInt(args[0]);
-			this.lineItemInputPath = args[1];
-			this.outputPath = args[2];
-		}
-		
-		FileDataSource lineItems =
-			new FileDataSource(new IntTupleDataInFormat(), this.lineItemInputPath, "LineItems");
-		lineItems.setParallelism(this.parallelism);
-		
-		FileDataSink result = 
-			new FileDataSink(new StringTupleDataOutFormat(), this.outputPath, "Output");
-		result.setParallelism(this.parallelism);
-		
-		MapOperator lineItemFilter = 
-			MapOperator.builder(new LineItemFilter())
-			.name("LineItem Filter")
-			.build();
-		lineItemFilter.setParallelism(this.parallelism);
-		
-		ReduceOperator groupByReturnFlag = 
-			ReduceOperator.builder(new GroupByReturnFlag(), StringValue.class, 0)
-			.name("groupyBy")
-			.build();
-		
-		lineItemFilter.setInput(lineItems);
-		groupByReturnFlag.setInput(lineItemFilter);
-		result.setInput(groupByReturnFlag);
-		
-		return new Plan(result, "TPC-H 1");
-	}
-
-	@Override
-	public String getDescription() {
-		return "Parameters: [parallelism] [lineitem-input] [output]";
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/ebba20df/flink-tests/src/test/java/org/apache/flink/test/recordJobs/relational/TPCHQuery10.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recordJobs/relational/TPCHQuery10.java b/flink-tests/src/test/java/org/apache/flink/test/recordJobs/relational/TPCHQuery10.java
deleted file mode 100644
index 4bb0cdf..0000000
--- a/flink-tests/src/test/java/org/apache/flink/test/recordJobs/relational/TPCHQuery10.java
+++ /dev/null
@@ -1,365 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.recordJobs.relational;
-
-import java.io.IOException;
-import java.text.DecimalFormat;
-import java.text.DecimalFormatSymbols;
-import java.util.Iterator;
-
-import org.apache.flink.api.common.Plan;
-import org.apache.flink.api.common.Program;
-import org.apache.flink.api.common.ProgramDescription;
-import org.apache.flink.api.java.record.functions.JoinFunction;
-import org.apache.flink.api.java.record.functions.MapFunction;
-import org.apache.flink.api.java.record.functions.ReduceFunction;
-import org.apache.flink.api.java.record.io.FileOutputFormat;
-import org.apache.flink.api.java.record.operators.FileDataSink;
-import org.apache.flink.api.java.record.operators.FileDataSource;
-import org.apache.flink.api.java.record.operators.JoinOperator;
-import org.apache.flink.api.java.record.operators.MapOperator;
-import org.apache.flink.api.java.record.operators.ReduceOperator;
-import org.apache.flink.test.recordJobs.util.IntTupleDataInFormat;
-import org.apache.flink.test.recordJobs.util.Tuple;
-import org.apache.flink.types.DoubleValue;
-import org.apache.flink.types.IntValue;
-import org.apache.flink.types.Record;
-import org.apache.flink.types.StringValue;
-import org.apache.flink.util.Collector;
-
-@SuppressWarnings({"serial", "deprecation"})
-public class TPCHQuery10 implements Program, ProgramDescription {
-	
-	// --------------------------------------------------------------------------------------------
-	//                         Local Filters and Projections
-	// --------------------------------------------------------------------------------------------
-	
-	/**
-	 * Forwards (0 = orderkey, 1 = custkey).
-	 */
-	public static class FilterO extends MapFunction
-	{
-		private static final int YEAR_FILTER = 1990;
-		
-		private final IntValue custKey = new IntValue();
-		
-		@Override
-		public void map(Record record, Collector<Record> out) throws Exception {
-			
-			Tuple t = record.getField(1, Tuple.class);
-			if (Integer.parseInt(t.getStringValueAt(4).substring(0, 4)) > FilterO.YEAR_FILTER) {
-				// project
-				this.custKey.setValue((int) t.getLongValueAt(1));
-				record.setField(1, this.custKey);
-				out.collect(record);
-			}
-			
-		}
-	}
-
-	/**
-	 * Forwards (0 = lineitem, 1 = tuple (extendedprice, discount) )
-	 */
-	public static class FilterLI extends MapFunction
-	{
-		private final Tuple tuple = new Tuple();
-		
-		@Override
-		public void map(Record record, Collector<Record> out) throws Exception
-		{
-			Tuple t = record.getField(1, this.tuple);
-			if (t.getStringValueAt(8).equals("R")) {
-				t.project(0x60); // l_extendedprice, l_discount
-				record.setField(1, t);
-				out.collect(record);
-			}
-		}
-	}
-	
-	/**
-	 * Returns (0 = custkey, 1 = custName, 2 = NULL, 3 = balance, 4 = nationkey, 5 = address, 6 = phone, 7 = comment)
-	 */
-	public static class ProjectC extends MapFunction {
-
-		private final Tuple tuple = new Tuple();
-		
-		private final StringValue custName = new StringValue();
-		
-		private final StringValue balance = new StringValue();
-		private final IntValue nationKey = new IntValue();
-		private final StringValue address = new StringValue();
-		private final StringValue phone = new StringValue();
-		private final StringValue comment = new StringValue();
-		
-		@Override
-		public void map(Record record, Collector<Record> out) throws Exception
-		{
-			final Tuple t = record.getField(1, this.tuple);
-			
-			this.custName.setValue(t.getStringValueAt(1));
-			this.address.setValue(t.getStringValueAt(2));
-			this.nationKey.setValue((int) t.getLongValueAt(3));
-			this.phone.setValue(t.getStringValueAt(4));
-			this.balance.setValue(t.getStringValueAt(5));
-			this.comment.setValue(t.getStringValueAt(7));
-			
-			record.setField(1, this.custName);
-			record.setField(3, this.balance);
-			record.setField(4, this.nationKey);
-			record.setField(5, this.address);
-			record.setField(6, this.phone);
-			record.setField(7, this.comment);
-			
-			out.collect(record);
-		}
-	}
-	
-	/**
-	 * Returns (0 = nationkey, 1 = nation_name)
-	 */
-	public static class ProjectN extends MapFunction
-	{
-		private final Tuple tuple = new Tuple();
-		private final StringValue nationName = new StringValue();
-		
-		@Override
-		public void map(Record record, Collector<Record> out) throws Exception
-		{
-			final Tuple t = record.getField(1, this.tuple);
-			
-			this.nationName.setValue(t.getStringValueAt(1));
-			record.setField(1, this.nationName);
-			out.collect(record);
-		}
-	}
-	
-	// --------------------------------------------------------------------------------------------
-	//                                        Joins
-	// --------------------------------------------------------------------------------------------
-
-	/**
-	 * Returns (0 = custKey, 1 = tuple (extendedprice, discount) )
-	 */
-	public static class JoinOL extends JoinFunction
-	{
-		@Override
-		public void join(Record order, Record lineitem, Collector<Record> out) throws Exception {
-			lineitem.setField(0, order.getField(1, IntValue.class));
-			out.collect(lineitem);
-		}
-	}
-
-	/**
-	 * Returns (0 = custkey, 1 = custName, 2 = extPrice * (1-discount), 3 = balance, 4 = nationkey, 5 = address, 6 = phone, 7 = comment)
-	 */
-	public static class JoinCOL extends JoinFunction
-	{
-		private final DoubleValue d = new DoubleValue();
-		
-		@Override
-		public void join(Record custRecord, Record olRecord, Collector<Record> out) throws Exception
-		{
-			final Tuple t = olRecord.getField(1, Tuple.class);
-			final double extPrice = Double.parseDouble(t.getStringValueAt(0));
-			final double discount = Double.parseDouble(t.getStringValueAt(1));
-			
-			this.d.setValue(extPrice * (1 - discount));
-			custRecord.setField(2, this.d);
-			out.collect(custRecord);
-		}
-
-	}
-	
-	/**
-	 * Returns (0 = custkey, 1 = custName, 2 = extPrice * (1-discount), 3 = balance, 4 = nationName, 5 = address, 6 = phone, 7 = comment)
-	 */
-	public static class JoinNCOL extends JoinFunction
-	{
-		@Override
-		public void join(Record colRecord, Record nation, Collector<Record> out) throws Exception {
-			colRecord.setField(4, nation.getField(1, StringValue.class));
-			out.collect(colRecord);
-		}
-	}
-	
-	@ReduceOperator.Combinable
-	public static class Sum extends ReduceFunction
-	{
-		private final DoubleValue d = new DoubleValue();
-		
-		@Override
-		public void reduce(Iterator<Record> records, Collector<Record> out) throws Exception
-		{
-			Record record = null;
-			double sum = 0;
-			while (records.hasNext()) {
-				record = records.next();
-				sum += record.getField(2, DoubleValue.class).getValue();
-			}
-		
-			this.d.setValue(sum);
-			record.setField(2, this.d);
-			out.collect(record);
-		}
-		
-		@Override
-		public void combine(Iterator<Record> records, Collector<Record> out) throws Exception {
-			reduce(records,out);
-		}
-	}
-
-	public static class TupleOutputFormat extends FileOutputFormat {
-		private static final long serialVersionUID = 1L;
-		
-		private final DecimalFormat formatter;
-		private final StringBuilder buffer = new StringBuilder();
-		
-		public TupleOutputFormat() {
-			DecimalFormatSymbols decimalFormatSymbol = new DecimalFormatSymbols();
-			decimalFormatSymbol.setDecimalSeparator('.');
-			
-			this.formatter = new DecimalFormat("#.####");
-			this.formatter.setDecimalFormatSymbols(decimalFormatSymbol);
-		}
-		
-		@Override
-		public void writeRecord(Record record) throws IOException
-		{
-			this.buffer.setLength(0);
-			this.buffer.append(record.getField(0, IntValue.class).toString()).append('|');
-			this.buffer.append(record.getField(1, StringValue.class).toString()).append('|');
-			
-			this.buffer.append(this.formatter.format(record.getField(2, DoubleValue.class).getValue())).append('|');
-			
-			this.buffer.append(record.getField(3, StringValue.class).toString()).append('|');
-			this.buffer.append(record.getField(4, StringValue.class).toString()).append('|');
-			this.buffer.append(record.getField(5, StringValue.class).toString()).append('|');
-			this.buffer.append(record.getField(6, StringValue.class).toString()).append('|');
-			this.buffer.append(record.getField(7, StringValue.class).toString()).append('|');
-			
-			this.buffer.append('\n');
-			
-			final byte[] bytes = this.buffer.toString().getBytes();
-			this.stream.write(bytes);
-		}
-	}
-
-	@Override
-	public String getDescription() {
-		return "TPC-H Query 10";
-	}
-
-	@Override
-	public Plan getPlan(String... args) throws IllegalArgumentException {
-		final String ordersPath;
-		final String lineitemsPath;
-		final String customersPath;
-		final String nationsPath;
-		final String resultPath;
-		
-		final int parallelism;
-
-		if (args.length < 6) {
-			throw new IllegalArgumentException("Invalid number of parameters");
-		} else {
-			parallelism = Integer.parseInt(args[0]);
-			ordersPath = args[1];
-			lineitemsPath = args[2];
-			customersPath = args[3];
-			nationsPath = args[4];
-			resultPath = args[5];
-		}
-		
-		FileDataSource orders = new FileDataSource(new IntTupleDataInFormat(), ordersPath, "Orders");
-		// orders.setOutputContract(UniqueKey.class);
-		// orders.getCompilerHints().setAvgNumValuesPerKey(1);
-
-		FileDataSource lineitems = new FileDataSource(new IntTupleDataInFormat(), lineitemsPath, "LineItems");
-		// lineitems.getCompilerHints().setAvgNumValuesPerKey(4);
-
-		FileDataSource customers = new FileDataSource(new IntTupleDataInFormat(), customersPath, "Customers");
-
-		FileDataSource nations = new FileDataSource(new IntTupleDataInFormat(), nationsPath, "Nations");
-
-
-		MapOperator mapO = MapOperator.builder(FilterO.class)
-			.name("FilterO")
-			.build();
-
-		MapOperator mapLi = MapOperator.builder(FilterLI.class)
-			.name("FilterLi")
-			.build();
-
-		MapOperator projectC = MapOperator.builder(ProjectC.class)
-			.name("ProjectC")
-			.build();
-
-		MapOperator projectN = MapOperator.builder(ProjectN.class)
-			.name("ProjectN")
-			.build();
-
-		JoinOperator joinOL = JoinOperator.builder(JoinOL.class, IntValue.class, 0, 0)
-			.name("JoinOL")
-			.build();
-
-		JoinOperator joinCOL = JoinOperator.builder(JoinCOL.class, IntValue.class, 0, 0)
-			.name("JoinCOL")
-			.build();
-
-		JoinOperator joinNCOL = JoinOperator.builder(JoinNCOL.class, IntValue.class, 4, 0)
-			.name("JoinNCOL")
-			.build();
-
-		ReduceOperator reduce = ReduceOperator.builder(Sum.class)
-			.keyField(IntValue.class, 0) 
-			.keyField(StringValue.class, 1)
-			.keyField(StringValue.class, 3)
-			.keyField(StringValue.class, 4)
-			.keyField(StringValue.class, 5)
-			.keyField(StringValue.class, 6)
-			.keyField(StringValue.class, 7)
-			.name("Reduce")
-			.build();
-
-		FileDataSink result = new FileDataSink(new TupleOutputFormat(), resultPath, "Output");
-
-		result.setInput(reduce);
-		
-		reduce.setInput(joinNCOL);
-		
-		joinNCOL.setFirstInput(joinCOL);
-		joinNCOL.setSecondInput(projectN);
-		
-		joinCOL.setFirstInput(projectC);
-		joinCOL.setSecondInput(joinOL);
-		
-		joinOL.setFirstInput(mapO);
-		joinOL.setSecondInput(mapLi);
-		
-		projectC.setInput(customers);
-		projectN.setInput(nations);
-		mapLi.setInput(lineitems);
-		mapO.setInput(orders);
-
-		// return the PACT plan
-		Plan p = new Plan(result, "TPCH Q10");
-		p.setDefaultParallelism(parallelism);
-		return p;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/ebba20df/flink-tests/src/test/java/org/apache/flink/test/recordJobs/relational/TPCHQuery3.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recordJobs/relational/TPCHQuery3.java b/flink-tests/src/test/java/org/apache/flink/test/recordJobs/relational/TPCHQuery3.java
deleted file mode 100644
index cebe6f9..0000000
--- a/flink-tests/src/test/java/org/apache/flink/test/recordJobs/relational/TPCHQuery3.java
+++ /dev/null
@@ -1,277 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.recordJobs.relational;
-
-import java.io.Serializable;
-import java.util.Iterator;
-
-import org.apache.flink.api.common.Plan;
-import org.apache.flink.api.common.Program;
-import org.apache.flink.api.common.ProgramDescription;
-import org.apache.flink.api.java.record.functions.JoinFunction;
-import org.apache.flink.api.java.record.functions.MapFunction;
-import org.apache.flink.api.java.record.functions.ReduceFunction;
-import org.apache.flink.api.java.record.functions.FunctionAnnotation.ConstantFields;
-import org.apache.flink.api.java.record.functions.FunctionAnnotation.ConstantFieldsFirst;
-import org.apache.flink.api.java.record.io.CsvInputFormat;
-import org.apache.flink.api.java.record.io.CsvOutputFormat;
-import org.apache.flink.api.java.record.operators.FileDataSink;
-import org.apache.flink.api.java.record.operators.FileDataSource;
-import org.apache.flink.api.java.record.operators.JoinOperator;
-import org.apache.flink.api.java.record.operators.MapOperator;
-import org.apache.flink.api.java.record.operators.ReduceOperator;
-import org.apache.flink.api.java.record.operators.ReduceOperator.Combinable;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.types.DoubleValue;
-import org.apache.flink.types.IntValue;
-import org.apache.flink.types.LongValue;
-import org.apache.flink.types.Record;
-import org.apache.flink.types.StringValue;
-import org.apache.flink.util.Collector;
-
-/**
- * The TPC-H is a decision support benchmark on relational data.
- * Its documentation and the data generator (DBGEN) can be found
- * on http://www.tpc.org/tpch/ .This implementation is tested with
- * the DB2 data format.  
- * 
- * This program implements a modified version of the query 3 of 
- * the TPC-H benchmark including one join, some filtering and an
- * aggregation.
- * 
- * SELECT l_orderkey, o_shippriority, sum(l_extendedprice) as revenue
- *   FROM orders, lineitem
- *   WHERE l_orderkey = o_orderkey
- *     AND o_orderstatus = "X" 
- *     AND YEAR(o_orderdate) > Y
- *     AND o_orderpriority LIKE "Z%"
- * GROUP BY l_orderkey, o_shippriority;
- */
-@SuppressWarnings("deprecation")
-public class TPCHQuery3 implements Program, ProgramDescription {
-
-	private static final long serialVersionUID = 1L;
-	
-	public static final String YEAR_FILTER = "parameter.YEAR_FILTER";
-	public static final String PRIO_FILTER = "parameter.PRIO_FILTER";
-
-	/**
-	 * Map PACT implements the selection and projection on the orders table.
-	 */
-	@ConstantFields({0,1})
-	public static class FilterO extends MapFunction implements Serializable {
-		private static final long serialVersionUID = 1L;
-		
-		private String prioFilter;		// filter literal for the order priority
-		private int yearFilter;			// filter literal for the year
-		
-		// reusable objects for the fields touched in the mapper
-		private StringValue orderStatus;
-		private StringValue orderDate;
-		private StringValue orderPrio;
-		
-		/**
-		 * Reads the filter literals from the configuration.
-		 * 
-		 * @see org.apache.flink.api.common.functions.RichFunction#open(org.apache.flink.configuration.Configuration)
-		 */
-		@Override
-		public void open(Configuration parameters) {
-			this.yearFilter = parameters.getInteger(YEAR_FILTER, 1990);
-			this.prioFilter = parameters.getString(PRIO_FILTER, "0");
-		}
-	
-		/**
-		 * Filters the orders table by year, order status and order priority.
-		 *
-		 *  o_orderstatus = "X" 
-		 *  AND YEAR(o_orderdate) > Y
-		 *  AND o_orderpriority LIKE "Z"
-		 *  
-		 * Output Schema: 
-		 *   0:ORDERKEY, 
-		 *   1:SHIPPRIORITY
-		 */
-		@Override
-		public void map(final Record record, final Collector<Record> out) {
-			orderStatus = record.getField(2, StringValue.class);
-			if (!orderStatus.getValue().equals("F")) {
-				return;
-			}
-			
-			orderPrio = record.getField(4, StringValue.class);
-			if(!orderPrio.getValue().startsWith(this.prioFilter)) {
-				return;
-			}
-			
-			orderDate = record.getField(3, StringValue.class);
-			if (!(Integer.parseInt(orderDate.getValue().substring(0, 4)) > this.yearFilter)) {
-				return;
-			}
-			
-			record.setNumFields(2);
-			out.collect(record);
-		}
-	}
-
-	/**
-	 * Match PACT realizes the join between LineItem and Order table.
-	 *
-	 */
-	@ConstantFieldsFirst({0,1})
-	public static class JoinLiO extends JoinFunction implements Serializable {
-		private static final long serialVersionUID = 1L;
-		
-		/**
-		 * Implements the join between LineItem and Order table on the order key.
-		 * 
-		 * Output Schema:
-		 *   0:ORDERKEY
-		 *   1:SHIPPRIORITY
-		 *   2:EXTENDEDPRICE
-		 */
-		@Override
-		public void join(Record order, Record lineitem, Collector<Record> out) {
-			order.setField(2, lineitem.getField(1, DoubleValue.class));
-			out.collect(order);
-		}
-	}
-
-	/**
-	 * Reduce PACT implements the sum aggregation. 
-	 * The Combinable annotation is set as the partial sums can be calculated
-	 * already in the combiner
-	 *
-	 */
-	@Combinable
-	@ConstantFields({0,1})
-	public static class AggLiO extends ReduceFunction implements Serializable {
-		private static final long serialVersionUID = 1L;
-		
-		private final DoubleValue extendedPrice = new DoubleValue();
-		
-		/**
-		 * Implements the sum aggregation.
-		 * 
-		 * Output Schema:
-		 *   0:ORDERKEY
-		 *   1:SHIPPRIORITY
-		 *   2:SUM(EXTENDEDPRICE)
-		 */
-		@Override
-		public void reduce(Iterator<Record> values, Collector<Record> out) {
-			Record rec = null;
-			double partExtendedPriceSum = 0;
-
-			while (values.hasNext()) {
-				rec = values.next();
-				partExtendedPriceSum += rec.getField(2, DoubleValue.class).getValue();
-			}
-
-			this.extendedPrice.setValue(partExtendedPriceSum);
-			rec.setField(2, this.extendedPrice);
-			out.collect(rec);
-		}
-
-		/**
-		 * Creates partial sums on the price attribute for each data batch.
-		 */
-		@Override
-		public void combine(Iterator<Record> values, Collector<Record> out) {
-			reduce(values, out);
-		}
-	}
-
-
-	@Override
-	public Plan getPlan(final String... args) {
-		// parse program parameters
-		final int numSubtasks       = (args.length > 0 ? Integer.parseInt(args[0]) : 1);
-		final String ordersPath    = (args.length > 1 ? args[1] : "");
-		final String lineitemsPath = (args.length > 2 ? args[2] : "");
-		final String output        = (args.length > 3 ? args[3] : "");
-
-		// create DataSourceContract for Orders input
-		FileDataSource orders = new FileDataSource(new CsvInputFormat(), ordersPath, "Orders");
-		CsvInputFormat.configureRecordFormat(orders)
-			.recordDelimiter('\n')
-			.fieldDelimiter('|')
-			.field(LongValue.class, 0)		// order id
-			.field(IntValue.class, 7) 		// ship prio
-			.field(StringValue.class, 2, 2)	// order status
-			.field(StringValue.class, 4, 10)	// order date
-			.field(StringValue.class, 5, 8);	// order prio
-
-		// create DataSourceContract for LineItems input
-		FileDataSource lineitems = new FileDataSource(new CsvInputFormat(), lineitemsPath, "LineItems");
-		CsvInputFormat.configureRecordFormat(lineitems)
-			.recordDelimiter('\n')
-			.fieldDelimiter('|')
-			.field(LongValue.class, 0)		// order id
-			.field(DoubleValue.class, 5);	// extended price
-
-		// create MapOperator for filtering Orders tuples
-		MapOperator filterO = MapOperator.builder(new FilterO())
-			.input(orders)
-			.name("FilterO")
-			.build();
-		// filter configuration
-		filterO.setParameter(YEAR_FILTER, 1993);
-		filterO.setParameter(PRIO_FILTER, "5");
-		// compiler hints
-		filterO.getCompilerHints().setFilterFactor(0.05f);
-
-		// create JoinOperator for joining Orders and LineItems
-		JoinOperator joinLiO = JoinOperator.builder(new JoinLiO(), LongValue.class, 0, 0)
-			.input1(filterO)
-			.input2(lineitems)
-			.name("JoinLiO")
-			.build();
-
-		// create ReduceOperator for aggregating the result
-		// the reducer has a composite key, consisting of the fields 0 and 1
-		ReduceOperator aggLiO = ReduceOperator.builder(new AggLiO())
-			.keyField(LongValue.class, 0)
-			.keyField(StringValue.class, 1)
-			.input(joinLiO)
-			.name("AggLio")
-			.build();
-
-		// create DataSinkContract for writing the result
-		FileDataSink result = new FileDataSink(new CsvOutputFormat(), output, aggLiO, "Output");
-		CsvOutputFormat.configureRecordFormat(result)
-			.recordDelimiter('\n')
-			.fieldDelimiter('|')
-			.lenient(true)
-			.field(LongValue.class, 0)
-			.field(IntValue.class, 1)
-			.field(DoubleValue.class, 2);
-		
-		// assemble the PACT plan
-		Plan plan = new Plan(result, "TPCH Q3");
-		plan.setDefaultParallelism(numSubtasks);
-		return plan;
-	}
-
-
-	@Override
-	public String getDescription() {
-		return "Parameters: [numSubStasks], [orders], [lineitem], [output]";
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/ebba20df/flink-tests/src/test/java/org/apache/flink/test/recordJobs/relational/TPCHQuery3Unioned.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recordJobs/relational/TPCHQuery3Unioned.java b/flink-tests/src/test/java/org/apache/flink/test/recordJobs/relational/TPCHQuery3Unioned.java
deleted file mode 100644
index 157e3cf..0000000
--- a/flink-tests/src/test/java/org/apache/flink/test/recordJobs/relational/TPCHQuery3Unioned.java
+++ /dev/null
@@ -1,175 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.recordJobs.relational;
-
-import org.apache.flink.api.common.Plan;
-import org.apache.flink.api.common.Program;
-import org.apache.flink.api.common.ProgramDescription;
-import org.apache.flink.api.java.record.io.CsvInputFormat;
-import org.apache.flink.api.java.record.io.CsvOutputFormat;
-import org.apache.flink.api.java.record.operators.FileDataSink;
-import org.apache.flink.api.java.record.operators.FileDataSource;
-import org.apache.flink.api.java.record.operators.JoinOperator;
-import org.apache.flink.api.java.record.operators.MapOperator;
-import org.apache.flink.api.java.record.operators.ReduceOperator;
-import org.apache.flink.test.recordJobs.relational.TPCHQuery3.AggLiO;
-import org.apache.flink.test.recordJobs.relational.TPCHQuery3.FilterO;
-import org.apache.flink.test.recordJobs.relational.TPCHQuery3.JoinLiO;
-import org.apache.flink.types.DoubleValue;
-import org.apache.flink.types.IntValue;
-import org.apache.flink.types.LongValue;
-import org.apache.flink.types.StringValue;
-
-/**
- * The TPC-H is a decision support benchmark on relational data.
- * Its documentation and the data generator (DBGEN) can be found
- * on http://www.tpc.org/tpch/ .This implementation is tested with
- * the DB2 data format.  
- * THe PACT program implements a modified version of the query 3 of 
- * the TPC-H benchmark including one join, some filtering and an
- * aggregation.
- * 
- * SELECT l_orderkey, o_shippriority, sum(l_extendedprice) as revenue
- *   FROM orders, lineitem
- *   WHERE l_orderkey = o_orderkey
- *     AND o_orderstatus = "X" 
- *     AND YEAR(o_orderdate) > Y
- *     AND o_orderpriority LIKE "Z%"
- * GROUP BY l_orderkey, o_shippriority;
- */
-@SuppressWarnings("deprecation")
-public class TPCHQuery3Unioned implements Program, ProgramDescription {
-
-	private static final long serialVersionUID = 1L;
-
-	@Override
-	public Plan getPlan(final String... args) {
-		// parse program parameters
-		final int numSubtasks       = (args.length > 0 ? Integer.parseInt(args[0]) : 1);
-		String orders1Path    = (args.length > 1 ? args[1] : "");
-		String orders2Path    = (args.length > 2 ? args[2] : "");
-		String partJoin1Path    = (args.length > 3 ? args[3] : "");
-		String partJoin2Path    = (args.length > 4 ? args[4] : "");
-		
-		String lineitemsPath = (args.length > 5 ? args[5] : "");
-		String output        = (args.length > 6 ? args[6] : "");
-
-		// create DataSourceContract for Orders input
-		FileDataSource orders1 = new FileDataSource(new CsvInputFormat(), orders1Path, "Orders 1");
-		CsvInputFormat.configureRecordFormat(orders1)
-			.recordDelimiter('\n')
-			.fieldDelimiter('|')
-			.field(LongValue.class, 0)		// order id
-			.field(IntValue.class, 7) 		// ship prio
-			.field(StringValue.class, 2, 2)	// order status
-			.field(StringValue.class, 4, 10)	// order date
-			.field(StringValue.class, 5, 8);	// order prio
-		
-		FileDataSource orders2 = new FileDataSource(new CsvInputFormat(), orders2Path, "Orders 2");
-		CsvInputFormat.configureRecordFormat(orders2)
-			.recordDelimiter('\n')
-			.fieldDelimiter('|')
-			.field(LongValue.class, 0)		// order id
-			.field(IntValue.class, 7) 		// ship prio
-			.field(StringValue.class, 2, 2)	// order status
-			.field(StringValue.class, 4, 10)	// order date
-			.field(StringValue.class, 5, 8);	// order prio
-		
-		// create DataSourceContract for LineItems input
-		FileDataSource lineitems = new FileDataSource(new CsvInputFormat(), lineitemsPath, "LineItems");
-		CsvInputFormat.configureRecordFormat(lineitems)
-			.recordDelimiter('\n')
-			.fieldDelimiter('|')
-			.field(LongValue.class, 0)
-			.field(DoubleValue.class, 5);
-
-		// create MapOperator for filtering Orders tuples
-		MapOperator filterO1 = MapOperator.builder(new FilterO())
-			.name("FilterO")
-			.input(orders1)
-			.build();
-		// filter configuration
-		filterO1.setParameter(TPCHQuery3.YEAR_FILTER, 1993);
-		filterO1.setParameter(TPCHQuery3.PRIO_FILTER, "5");
-		filterO1.getCompilerHints().setFilterFactor(0.05f);
-		
-		// create MapOperator for filtering Orders tuples
-		MapOperator filterO2 = MapOperator.builder(new FilterO())
-			.name("FilterO")
-			.input(orders2)
-			.build();
-		// filter configuration
-		filterO2.setParameter(TPCHQuery3.YEAR_FILTER, 1993);
-		filterO2.setParameter(TPCHQuery3.PRIO_FILTER, "5");
-
-		// create JoinOperator for joining Orders and LineItems
-		@SuppressWarnings("unchecked")
-		JoinOperator joinLiO = JoinOperator.builder(new JoinLiO(), LongValue.class, 0, 0)
-			.input1(filterO2, filterO1)
-			.input2(lineitems)
-			.name("JoinLiO")
-			.build();
-		
-		FileDataSource partJoin1 = new FileDataSource(new CsvInputFormat(), partJoin1Path, "Part Join 1");
-		CsvInputFormat.configureRecordFormat(partJoin1)
-			.recordDelimiter('\n')
-			.fieldDelimiter('|')
-			.field(LongValue.class, 0)
-			.field(IntValue.class, 1)
-			.field(DoubleValue.class, 2);
-		
-		FileDataSource partJoin2 = new FileDataSource(new CsvInputFormat(), partJoin2Path, "Part Join 2");
-		CsvInputFormat.configureRecordFormat(partJoin2)
-			.recordDelimiter('\n')
-			.fieldDelimiter('|')
-			.field(LongValue.class, 0)
-			.field(IntValue.class, 1)
-			.field(DoubleValue.class, 2);
-		
-		// create ReduceOperator for aggregating the result
-		// the reducer has a composite key, consisting of the fields 0 and 1
-		@SuppressWarnings("unchecked")
-		ReduceOperator aggLiO = ReduceOperator.builder(new AggLiO())
-			.keyField(LongValue.class, 0)
-			.keyField(StringValue.class, 1)
-			.input(joinLiO, partJoin2, partJoin1)
-			.name("AggLio")
-			.build();
-
-		// create DataSinkContract for writing the result
-		FileDataSink result = new FileDataSink(new CsvOutputFormat(), output, aggLiO, "Output");
-		CsvOutputFormat.configureRecordFormat(result)
-			.recordDelimiter('\n')
-			.fieldDelimiter('|')
-			.lenient(true)
-			.field(LongValue.class, 0)
-			.field(IntValue.class, 1)
-			.field(DoubleValue.class, 2);
-		
-		// assemble the PACT plan
-		Plan plan = new Plan(result, "TPCH Q3 Unioned");
-		plan.setDefaultParallelism(numSubtasks);
-		return plan;
-	}
-
-	@Override
-	public String getDescription() {
-		return "Parameters: [numSubStasks], [orders1], [orders2], [partJoin1], [partJoin2], [lineitem], [output]";
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/ebba20df/flink-tests/src/test/java/org/apache/flink/test/recordJobs/relational/TPCHQuery4.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recordJobs/relational/TPCHQuery4.java b/flink-tests/src/test/java/org/apache/flink/test/recordJobs/relational/TPCHQuery4.java
deleted file mode 100644
index ec3c5b4..0000000
--- a/flink-tests/src/test/java/org/apache/flink/test/recordJobs/relational/TPCHQuery4.java
+++ /dev/null
@@ -1,284 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.recordJobs.relational;
-
-import java.text.ParseException;
-import java.text.SimpleDateFormat;
-import java.util.Calendar;
-import java.util.Date;
-import java.util.GregorianCalendar;
-import java.util.Iterator;
-
-import org.apache.flink.api.common.Plan;
-import org.apache.flink.api.common.Program;
-import org.apache.flink.api.common.ProgramDescription;
-import org.apache.flink.api.java.record.functions.JoinFunction;
-import org.apache.flink.api.java.record.functions.MapFunction;
-import org.apache.flink.api.java.record.functions.ReduceFunction;
-import org.apache.flink.api.java.record.operators.FileDataSink;
-import org.apache.flink.api.java.record.operators.FileDataSource;
-import org.apache.flink.api.java.record.operators.JoinOperator;
-import org.apache.flink.api.java.record.operators.MapOperator;
-import org.apache.flink.api.java.record.operators.ReduceOperator;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.test.recordJobs.util.IntTupleDataInFormat;
-import org.apache.flink.test.recordJobs.util.StringTupleDataOutFormat;
-import org.apache.flink.test.recordJobs.util.Tuple;
-import org.apache.flink.types.IntValue;
-import org.apache.flink.types.Record;
-import org.apache.flink.types.StringValue;
-import org.apache.flink.util.Collector;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Implementation of the TPC-H Query 4 as a Flink program.
- */
-
-@SuppressWarnings({"serial", "deprecation"})
-public class TPCHQuery4 implements Program, ProgramDescription {
-
-	private static Logger LOG = LoggerFactory.getLogger(TPCHQuery4.class);
-	
-	private int parallelism = 1;
-	private String ordersInputPath;
-	private String lineItemInputPath;
-	private String outputPath;
-	
-	
-	/**
-	 * Small {@link MapFunction} to filer out the irrelevant orders.
-	 *
-	 */
-	//@SameKey
-	public static class OFilter extends MapFunction {
-
-		private final String dateParamString = "1995-01-01";
-		private final SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd");
-		private final GregorianCalendar gregCal = new GregorianCalendar();
-		
-		private Date paramDate;
-		private Date plusThreeMonths;
-		
-		@Override
-		public void open(Configuration parameters) {				
-			try {
-				this.paramDate = sdf.parse(this.dateParamString);
-				this.plusThreeMonths = getPlusThreeMonths(paramDate);
-				
-			} catch (ParseException e) {
-				throw new RuntimeException(e);
-			}
-		}
-		
-		@Override
-		public void map(Record record, Collector<Record> out) throws Exception {
-			Tuple tuple = record.getField(1, Tuple.class);
-			Date orderDate;
-			
-			String orderStringDate = tuple.getStringValueAt(4);
-			
-			try {
-				orderDate = sdf.parse(orderStringDate);
-			} catch (ParseException e) {
-				throw new RuntimeException(e);
-			}
-			
-			if(paramDate.before(orderDate) && plusThreeMonths.after(orderDate)) {
-				out.collect(record);
-			}
-
-		}
-
-		/**
-		 * Calculates the {@link Date} which is three months after the given one.
-		 * @param paramDate of type {@link Date}.
-		 * @return a {@link Date} three month later.
-		 */
-		private Date getPlusThreeMonths(Date paramDate) {
-			
-			gregCal.setTime(paramDate);
-			gregCal.add(Calendar.MONTH, 3);
-			Date plusThreeMonths = gregCal.getTime();
-			return plusThreeMonths;
-		}
-	}
-	
-	/**
-	 * Simple filter for the line item selection. It filters all teh tuples that do
-	 * not satisfy the &quot;l_commitdate &lt; l_receiptdate&quot; condition.
-	 * 
-	 */
-	//@SameKey
-	public static class LiFilter extends MapFunction {
-
-		private final SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd");
-		
-		@Override
-		public void map(Record record, Collector<Record> out) throws Exception {
-			Tuple tuple = record.getField(1, Tuple.class);
-			String commitString = tuple.getStringValueAt(11);
-			String receiptString = tuple.getStringValueAt(12);
-
-			Date commitDate;
-			Date receiptDate;
-			
-			try {
-				commitDate = sdf.parse(commitString);
-				receiptDate = sdf.parse(receiptString);
-			} catch (ParseException e) {
-				throw new RuntimeException(e);
-			}
-
-			if (commitDate.before(receiptDate)) {
-				out.collect(record);
-			}
-
-		}
-	}
-	
-	/**
-	 * Implements the equijoin on the orderkey and performs the projection on 
-	 * the order priority as well.
-	 *
-	 */
-	public static class JoinLiO extends JoinFunction {
-		
-		@Override
-		public void join(Record order, Record line, Collector<Record> out)
-				throws Exception {
-			Tuple orderTuple = order.getField(1, Tuple.class);
-			
-			orderTuple.project(32);
-			String newOrderKey = orderTuple.getStringValueAt(0);
-			
-			order.setField(0, new StringValue(newOrderKey));
-			out.collect(order);
-		}
-	}
-	
-	/**
-	 * Implements the count(*) part. 
-	 *
-	 */
-	//@SameKey
-	public static class CountAgg extends ReduceFunction {
-		
-		@Override
-		public void reduce(Iterator<Record> records, Collector<Record> out) throws Exception {	
-			long count = 0;
-			Record rec = null;
-			
-			while(records.hasNext()) {
-			 	rec = records.next();
-			 	count++;
-			}
-			
-			if(rec != null)
-			{
-				Tuple tuple = new Tuple();
-				tuple.addAttribute("" + count);
-				rec.setField(1, tuple);
-			}
-			
-			out.collect(rec);
-		}
-	}
-	
-
-	@Override
-	public Plan getPlan(String... args) throws IllegalArgumentException {
-		
-		if(args == null || args.length != 4)
-		{
-			LOG.warn("number of arguments do not match!");
-			this.ordersInputPath = "";
-			this.lineItemInputPath = "";
-			this.outputPath = "";
-		}else
-		{
-			setArgs(args);
-		}
-		
-		FileDataSource orders = 
-			new FileDataSource(new IntTupleDataInFormat(), this.ordersInputPath, "Orders");
-		orders.setParallelism(this.parallelism);
-		//orders.setOutputContract(UniqueKey.class);
-		
-		FileDataSource lineItems =
-			new FileDataSource(new IntTupleDataInFormat(), this.lineItemInputPath, "LineItems");
-		lineItems.setParallelism(this.parallelism);
-		
-		FileDataSink result = 
-				new FileDataSink(new StringTupleDataOutFormat(), this.outputPath, "Output");
-		result.setParallelism(parallelism);
-		
-		MapOperator lineFilter = 
-				MapOperator.builder(LiFilter.class)
-			.name("LineItemFilter")
-			.build();
-		lineFilter.setParallelism(parallelism);
-		
-		MapOperator ordersFilter = 
-				MapOperator.builder(OFilter.class)
-			.name("OrdersFilter")
-			.build();
-		ordersFilter.setParallelism(parallelism);
-		
-		JoinOperator join = 
-				JoinOperator.builder(JoinLiO.class, IntValue.class, 0, 0)
-			.name("OrdersLineitemsJoin")
-			.build();
-			join.setParallelism(parallelism);
-		
-		ReduceOperator aggregation = 
-				ReduceOperator.builder(CountAgg.class, StringValue.class, 0)
-			.name("AggregateGroupBy")
-			.build();
-		aggregation.setParallelism(this.parallelism);
-		
-		lineFilter.setInput(lineItems);
-		ordersFilter.setInput(orders);
-		join.setFirstInput(ordersFilter);
-		join.setSecondInput(lineFilter);
-		aggregation.setInput(join);
-		result.setInput(aggregation);
-		
-			
-		return new Plan(result, "TPC-H 4");
-	}
-
-	/**
-	 * Get the args into the members.
-	 * @param args
-	 */
-	private void setArgs(String[] args) {
-		this.parallelism = Integer.parseInt(args[0]);
-		this.ordersInputPath = args[1];
-		this.lineItemInputPath = args[2];
-		this.outputPath = args[3];
-	}
-
-
-	@Override
-	public String getDescription() {
-		return "Parameters: [parallelism] [orders-input] [lineitem-input] [output]";
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/ebba20df/flink-tests/src/test/java/org/apache/flink/test/recordJobs/relational/TPCHQuery9.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recordJobs/relational/TPCHQuery9.java b/flink-tests/src/test/java/org/apache/flink/test/recordJobs/relational/TPCHQuery9.java
deleted file mode 100644
index c00d231..0000000
--- a/flink-tests/src/test/java/org/apache/flink/test/recordJobs/relational/TPCHQuery9.java
+++ /dev/null
@@ -1,251 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.recordJobs.relational;
-
-import org.apache.flink.api.common.Plan;
-import org.apache.flink.api.common.Program;
-import org.apache.flink.api.common.ProgramDescription;
-import org.apache.flink.api.java.record.operators.FileDataSink;
-import org.apache.flink.api.java.record.operators.FileDataSource;
-import org.apache.flink.api.java.record.operators.JoinOperator;
-import org.apache.flink.api.java.record.operators.MapOperator;
-import org.apache.flink.api.java.record.operators.ReduceOperator;
-import org.apache.flink.test.recordJobs.relational.query9Util.AmountAggregate;
-import org.apache.flink.test.recordJobs.relational.query9Util.FilteredPartsJoin;
-import org.apache.flink.test.recordJobs.relational.query9Util.IntPair;
-import org.apache.flink.test.recordJobs.relational.query9Util.LineItemMap;
-import org.apache.flink.test.recordJobs.relational.query9Util.OrderMap;
-import org.apache.flink.test.recordJobs.relational.query9Util.OrderedPartsJoin;
-import org.apache.flink.test.recordJobs.relational.query9Util.PartFilter;
-import org.apache.flink.test.recordJobs.relational.query9Util.PartJoin;
-import org.apache.flink.test.recordJobs.relational.query9Util.PartListJoin;
-import org.apache.flink.test.recordJobs.relational.query9Util.PartsuppMap;
-import org.apache.flink.test.recordJobs.relational.query9Util.StringIntPair;
-import org.apache.flink.test.recordJobs.relational.query9Util.StringIntPairStringDataOutFormat;
-import org.apache.flink.test.recordJobs.relational.query9Util.SupplierMap;
-import org.apache.flink.test.recordJobs.relational.query9Util.SuppliersJoin;
-import org.apache.flink.test.recordJobs.util.IntTupleDataInFormat;
-import org.apache.flink.types.IntValue;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Quote from the TPC-H homepage:
- * "The TPC-H is a decision support benchmark on relational data.
- * Its documentation and the data generator (DBGEN) can be found
- * on http://www.tpc.org/tpch/ .This implementation is tested with
- * the DB2 data format."
- * This PACT program implements the query 9 of the TPC-H benchmark:
- * 
- * <pre>
- * select nation, o_year, sum(amount) as sum_profit
- * from (
- *   select n_name as nation, extract(year from o_orderdate) as o_year,
- *          l_extendedprice * (1 - l_discount) - ps_supplycost * l_quantity as amount
- *   from part, supplier, lineitem, partsupp, orders, nation
- *   where
- *     s_suppkey = l_suppkey and ps_suppkey = l_suppkey and ps_partkey = l_partkey
- *     and p_partkey = l_partkey and o_orderkey = l_orderkey and s_nationkey = n_nationkey
- *     and p_name like '%[COLOR]%'
- * ) as profit
- * group by nation, o_year
- * order by nation, o_year desc;
- * </pre>
- * 
- * Plan:<br>
- * Match "part" and "partsupp" on "partkey" -> "parts" with (partkey, suppkey) as key
- * Match "orders" and "lineitem" on "orderkey" -> "ordered_parts" with (partkey, suppkey) as key
- * Match "parts" and "ordered_parts" on (partkey, suppkey) -> "filtered_parts" with "suppkey" as key
- * Match "supplier" and "nation" on "nationkey" -> "suppliers" with "suppkey" as key
- * Match "filtered_parts" and "suppliers" on" suppkey" -> "partlist" with (nation, o_year) as key
- * Group "partlist" by (nation, o_year), calculate sum(amount)
- * 
- * <b>Attention:</b> The "order by" part is not implemented!
- * 
- */
-@SuppressWarnings({"serial", "deprecation"})
-public class TPCHQuery9 implements Program, ProgramDescription {
-	public final String ARGUMENTS = "parallelism partInputPath partSuppInputPath ordersInputPath lineItemInputPath supplierInputPath nationInputPath outputPath";
-
-	private static Logger LOG = LoggerFactory.getLogger(TPCHQuery9.class);
-
-	private int parallelism = 1;
-
-	private String partInputPath, partSuppInputPath, ordersInputPath, lineItemInputPath, supplierInputPath,
-			nationInputPath;
-
-	private String outputPath;
-
-
-	@Override
-	public Plan getPlan(String... args) throws IllegalArgumentException {
-
-		if (args.length != 8)
-		{
-			LOG.warn("number of arguments do not match!");
-			
-			this.parallelism = 1;
-			this.partInputPath = "";
-			this.partSuppInputPath = "";
-			this.ordersInputPath = "";
-			this.lineItemInputPath = "";
-			this.supplierInputPath = "";
-			this.nationInputPath = "";
-			this.outputPath = "";
-		}else
-		{
-			this.parallelism = Integer.parseInt(args[0]);
-			this.partInputPath = args[1];
-			this.partSuppInputPath = args[2];
-			this.ordersInputPath = args[3];
-			this.lineItemInputPath = args[4];
-			this.supplierInputPath = args[5];
-			this.nationInputPath = args[6];
-			this.outputPath = args[7];
-		}
-		
-		/* Create the 6 data sources: */
-		/* part: (partkey | name, mfgr, brand, type, size, container, retailprice, comment) */
-		FileDataSource partInput = new FileDataSource(
-			new IntTupleDataInFormat(), this.partInputPath, "\"part\" source");
-		//partInput.setOutputContract(UniqueKey.class);
-//		partInput.getCompilerHints().setAvgNumValuesPerKey(1);
-
-		/* partsupp: (partkey | suppkey, availqty, supplycost, comment) */
-		FileDataSource partSuppInput = new FileDataSource(
-			new IntTupleDataInFormat(), this.partSuppInputPath, "\"partsupp\" source");
-
-		/* orders: (orderkey | custkey, orderstatus, totalprice, orderdate, orderpriority, clerk, shippriority, comment) */
-		FileDataSource ordersInput = new FileDataSource(
-			new IntTupleDataInFormat(), this.ordersInputPath, "\"orders\" source");
-		//ordersInput.setOutputContract(UniqueKey.class);
-//		ordersInput.getCompilerHints().setAvgNumValuesPerKey(1);
-
-		/* lineitem: (orderkey | partkey, suppkey, linenumber, quantity, extendedprice, discount, tax, ...) */
-		FileDataSource lineItemInput = new FileDataSource(
-			new IntTupleDataInFormat(), this.lineItemInputPath, "\"lineitem\" source");
-
-		/* supplier: (suppkey | name, address, nationkey, phone, acctbal, comment) */
-		FileDataSource supplierInput = new FileDataSource(
-			new IntTupleDataInFormat(), this.supplierInputPath, "\"supplier\" source");
-		//supplierInput.setOutputContract(UniqueKey.class);
-//		supplierInput.getCompilerHints().setAvgNumValuesPerKey(1);
-
-		/* nation: (nationkey | name, regionkey, comment) */
-		FileDataSource nationInput = new FileDataSource(
-			new IntTupleDataInFormat(), this.nationInputPath, "\"nation\" source");
-		//nationInput.setOutputContract(UniqueKey.class);
-//		nationInput.getCompilerHints().setAvgNumValuesPerKey(1);
-
-		/* Filter on part's name, project values to NULL: */
-		MapOperator filterPart = MapOperator.builder(PartFilter.class)
-			.name("filterParts")
-			.build();
-
-		/* Map to change the key element of partsupp, project value to (supplycost, suppkey): */
-		MapOperator mapPartsupp = MapOperator.builder(PartsuppMap.class)
-			.name("mapPartsupp")
-			.build();
-
-		/* Map to extract the year from order: */
-		MapOperator mapOrder = MapOperator.builder(OrderMap.class)
-			.name("mapOrder")
-			.build();
-
-		/* Project value to (partkey, suppkey, quantity, price = extendedprice*(1-discount)): */
-		MapOperator mapLineItem = MapOperator.builder(LineItemMap.class)
-			.name("proj.Partsupp")
-			.build();
-
-		/* - change the key of supplier to nationkey, project value to suppkey */
-		MapOperator mapSupplier = MapOperator.builder(SupplierMap.class)
-			.name("proj.Partsupp")
-			.build();
-
-		/* Equijoin on partkey of part and partsupp: */
-		JoinOperator partsJoin = JoinOperator.builder(PartJoin.class, IntValue.class, 0, 0)
-			.name("partsJoin")
-			.build();
-
-		/* Equijoin on orderkey of orders and lineitem: */
-		JoinOperator orderedPartsJoin =
-			JoinOperator.builder(OrderedPartsJoin.class, IntValue.class, 0, 0)
-			.name("orderedPartsJoin")
-			.build();
-
-		/* Equijoin on nationkey of supplier and nation: */
-		JoinOperator suppliersJoin =
-			JoinOperator.builder(SuppliersJoin.class, IntValue.class, 0, 0)
-			.name("suppliersJoin")
-			.build();
-
-		/* Equijoin on (partkey,suppkey) of parts and orderedParts: */
-		JoinOperator filteredPartsJoin =
-			JoinOperator.builder(FilteredPartsJoin.class, IntPair.class, 0, 0)
-			.name("filteredPartsJoin")
-			.build();
-
-		/* Equijoin on suppkey of filteredParts and suppliers: */
-		JoinOperator partListJoin =
-			JoinOperator.builder(PartListJoin.class, IntValue.class , 0, 0)
-			.name("partlistJoin")
-			.build();
-
-		/* Aggregate sum(amount) by (nation,year): */
-		ReduceOperator sumAmountAggregate =
-			ReduceOperator.builder(AmountAggregate.class, StringIntPair.class, 0)
-			.name("groupyBy")
-			.build();
-
-		/* Connect input filters: */
-		filterPart.setInput(partInput);
-		mapPartsupp.setInput(partSuppInput);
-		mapOrder.setInput(ordersInput);
-		mapLineItem.setInput(lineItemInput);
-		mapSupplier.setInput(supplierInput);
-
-		/* Connect equijoins: */
-		partsJoin.setFirstInput(filterPart);
-		partsJoin.setSecondInput(mapPartsupp);
-		orderedPartsJoin.setFirstInput(mapOrder);
-		orderedPartsJoin.setSecondInput(mapLineItem);
-		suppliersJoin.setFirstInput(mapSupplier);
-		suppliersJoin.setSecondInput(nationInput);
-		filteredPartsJoin.setFirstInput(partsJoin);
-		filteredPartsJoin.setSecondInput(orderedPartsJoin);
-		partListJoin.setFirstInput(filteredPartsJoin);
-		partListJoin.setSecondInput(suppliersJoin);
-
-		/* Connect aggregate: */
-		sumAmountAggregate.setInput(partListJoin);
-
-		/* Connect sink: */
-		FileDataSink result = new FileDataSink(new StringIntPairStringDataOutFormat(), this.outputPath, "Results sink");
-		result.setInput(sumAmountAggregate);
-
-		Plan p = new Plan(result, "TPC-H query 9");
-		p.setDefaultParallelism(this.parallelism);
-		return p;
-	}
-
-	@Override
-	public String getDescription() {
-		return "TPC-H query 9, parameters: " + this.ARGUMENTS;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/ebba20df/flink-tests/src/test/java/org/apache/flink/test/recordJobs/relational/TPCHQueryAsterix.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recordJobs/relational/TPCHQueryAsterix.java b/flink-tests/src/test/java/org/apache/flink/test/recordJobs/relational/TPCHQueryAsterix.java
deleted file mode 100644
index a681f64..0000000
--- a/flink-tests/src/test/java/org/apache/flink/test/recordJobs/relational/TPCHQueryAsterix.java
+++ /dev/null
@@ -1,205 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.test.recordJobs.relational;
-
-import java.io.Serializable;
-import java.util.Iterator;
-
-import org.apache.flink.api.common.Plan;
-import org.apache.flink.api.common.Program;
-import org.apache.flink.api.common.ProgramDescription;
-import org.apache.flink.api.java.record.functions.JoinFunction;
-import org.apache.flink.api.java.record.functions.ReduceFunction;
-import org.apache.flink.api.java.record.functions.FunctionAnnotation.ConstantFields;
-import org.apache.flink.api.java.record.functions.FunctionAnnotation.ConstantFieldsSecondExcept;
-import org.apache.flink.api.java.record.io.CsvInputFormat;
-import org.apache.flink.api.java.record.io.CsvOutputFormat;
-import org.apache.flink.api.java.record.operators.FileDataSink;
-import org.apache.flink.api.java.record.operators.FileDataSource;
-import org.apache.flink.api.java.record.operators.JoinOperator;
-import org.apache.flink.api.java.record.operators.ReduceOperator;
-import org.apache.flink.api.java.record.operators.ReduceOperator.Combinable;
-import org.apache.flink.types.IntValue;
-import org.apache.flink.types.Record;
-import org.apache.flink.types.StringValue;
-import org.apache.flink.util.Collector;
-
-/**
- * The TPC-H is a decision support benchmark on relational data.
- * Its documentation and the data generator (DBGEN) can be found
- * on http://www.tpc.org/tpch/ .This implementation is tested with
- * the DB2 data format.  
- * 
- * This program implements a query on the TPC-H schema 
- * including one join and an aggregation.
- * This query is used as example in the Asterix project (http://asterix.ics.uci.edu/).
- * 
- * SELECT c_mktsegment, COUNT(o_orderkey)
- *   FROM orders, customer
- *   WHERE c_custkey = o_custkey
- * GROUP BY c_mktsegment;
- * 
- */
-@SuppressWarnings("deprecation")
-public class TPCHQueryAsterix implements Program, ProgramDescription {
-
-	private static final long serialVersionUID = 1L;
-
-
-	/**
-	 * Realizes the join between Customers and Order table.
-	 */
-	@ConstantFieldsSecondExcept(0)
-	public static class JoinCO extends JoinFunction implements Serializable {
-		private static final long serialVersionUID = 1L;
-
-		private final IntValue one = new IntValue(1);
-		
-		/**
-		 * Output Schema:
-		 *  0: PARTIAL_COUNT=1
-		 *  1: C_MKTSEGMENT
-		 */
-		@Override
-		public void join(Record order, Record cust, Collector<Record> out)
-				throws Exception {
-			cust.setField(0, one);
-			out.collect(cust);
-		}
-	}
-
-	/**
-	 * Reduce implements the aggregation of the results. The 
-	 * Combinable annotation is set as the partial counts can be calculated
-	 * already in the combiner
-	 *
-	 */
-	@Combinable
-	@ConstantFields(1)
-	public static class AggCO extends ReduceFunction implements Serializable {
-		private static final long serialVersionUID = 1L;
-
-		private final IntValue integer = new IntValue();
-		private Record record = new Record();
-	
-		/**
-		 * Output Schema:
-		 * 0: COUNT
-		 * 1: C_MKTSEGMENT
-		 *
-		 */
-		@Override
-		public void reduce(Iterator<Record> records, Collector<Record> out)
-				throws Exception {
-
-			int count = 0;
-
-			while (records.hasNext()) {
-				record = records.next();
-				count+=record.getField(0, integer).getValue();
-			}
-
-			integer.setValue(count);
-			record.setField(0, integer);
-			out.collect(record);
-		}
-		
-		/**
-		 * Computes partial counts
-		 */
-		public void combine(Iterator<Record> records, Collector<Record> out)
-				throws Exception {
-			reduce(records, out);
-		}
-
-	}
-
-
-	@Override
-	public Plan getPlan(final String... args) {
-
-		// parse program parameters
-		int numSubtasks       = (args.length > 0 ? Integer.parseInt(args[0]) : 1);
-		String ordersPath    = (args.length > 1 ? args[1] : "");
-		String customerPath  = (args.length > 2 ? args[2] : "");
-		String output        = (args.length > 3 ? args[3] : "");
-
-		/*
-		 * Output Schema:
-		 * 0: CUSTOMER_ID
-		 */
-		// create DataSourceContract for Orders input
-		FileDataSource orders = new FileDataSource(new CsvInputFormat(), ordersPath, "Orders");
-		orders.setParallelism(numSubtasks);
-		CsvInputFormat.configureRecordFormat(orders)
-			.recordDelimiter('\n')
-			.fieldDelimiter('|')
-			.field(IntValue.class, 1);
-		
-		/*
-		 * Output Schema:
-		 * 0: CUSTOMER_ID
-		 * 1: MKT_SEGMENT
-		 */
-		// create DataSourceContract for Customer input
-		FileDataSource customers = new FileDataSource(new CsvInputFormat(), customerPath, "Customers");
-		customers.setParallelism(numSubtasks);
-		CsvInputFormat.configureRecordFormat(customers)
-			.recordDelimiter('\n')
-			.fieldDelimiter('|')
-			.field(IntValue.class, 0)
-			.field(StringValue.class, 6);
-		
-		// create JoinOperator for joining Orders and LineItems
-		JoinOperator joinCO = JoinOperator.builder(new JoinCO(), IntValue.class, 0, 0)
-			.name("JoinCO")
-			.build();
-		joinCO.setParallelism(numSubtasks);
-
-		// create ReduceOperator for aggregating the result
-		ReduceOperator aggCO = ReduceOperator.builder(new AggCO(), StringValue.class, 1)
-			.name("AggCo")
-			.build();
-		aggCO.setParallelism(numSubtasks);
-
-		// create DataSinkContract for writing the result
-		FileDataSink result = new FileDataSink(new CsvOutputFormat(), output, "Output");
-		result.setParallelism(numSubtasks);
-		CsvOutputFormat.configureRecordFormat(result)
-			.recordDelimiter('\n')
-			.fieldDelimiter('|')
-			.field(IntValue.class, 0)
-			.field(StringValue.class, 1);
-
-		// assemble the plan
-		result.setInput(aggCO);
-		aggCO.setInput(joinCO);
-		joinCO.setFirstInput(orders);
-		joinCO.setSecondInput(customers);
-
-		return new Plan(result, "TPCH Asterix");
-	}
-
-
-	@Override
-	public String getDescription() {
-		return "Parameters: [numSubStasks], [orders], [customer], [output]";
-	}
-}


Mime
View raw message