hawq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From r..@apache.org
Subject [43/51] [partial] incubator-hawq git commit: SGA import
Date Sat, 19 Sep 2015 00:36:26 GMT
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/8b26974c/contrib/gp_sparse_vector/operators.c
----------------------------------------------------------------------
diff --git a/contrib/gp_sparse_vector/operators.c b/contrib/gp_sparse_vector/operators.c
new file mode 100644
index 0000000..3273f1e
--- /dev/null
+++ b/contrib/gp_sparse_vector/operators.c
@@ -0,0 +1,920 @@
+/**
+ * @file
+ * This module defines a collection of operators for svecs. The functions
+ * are usually wrappers that call the corresponding operators defined for
+ * SparseData.
+ */
+
+#include <postgres.h>
+
+#include <stdio.h>
+#include <stdlib.h>
+#include <math.h>
+
+#include "utils/array.h"
+#include "catalog/pg_type.h"
+#include "utils/numeric.h"
+#include "utils/builtins.h"
+#include "utils/memutils.h"
+#include "access/hash.h"
+
+#include "sparse_vector.h"
+
+#ifndef NO_PG_MODULE_MAGIC
+PG_MODULE_MAGIC;
+#endif
+
+/**
+ * For many functions defined in this module, the operation has no meaning
+ * if the array dimensions aren't the same, unless one of the inputs is a
+ * scalar. This routine checks that condition.
+ */
+void check_dimension(SvecType *svec1, SvecType *svec2, char *msg) {
+	if ((!IS_SCALAR(svec1)) &&
+	    (!IS_SCALAR(svec2)) &&
+	    (svec1->dimension != svec2->dimension)) {
+		ereport(ERROR,
+			(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+			 errmsg("%s: array dimension of inputs are not the same: dim1=%d, dim2=%d\n",
+				msg, svec1->dimension, svec2->dimension)));
+	}
+}
+
+/**
+ * Dot Product of two svec types
+ */
+double svec_svec_dot_product(SvecType *svec1, SvecType *svec2) {
+	SparseData left  = sdata_from_svec(svec1);
+	SparseData right = sdata_from_svec(svec2);
+
+	check_dimension(svec1,svec2,"svec_svec_dot_product");
+	return sum_sdata_values_double( op_sdata_by_sdata(multiply,left,right));
+}
+
+/**
+ *  svec_concat_replicate - replicates an svec multiple times
+ */
+Datum svec_concat_replicate(PG_FUNCTION_ARGS);
+PG_FUNCTION_INFO_V1( svec_concat_replicate);
+
+Datum svec_concat_replicate(PG_FUNCTION_ARGS)
+{
+	int multiplier = PG_GETARG_INT32(0);
+	if (multiplier < 0)
+		ereport(ERROR,
+			(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+			 errmsg("multiplier cannot be negative")));
+
+	SvecType *svec = PG_GETARG_SVECTYPE_P(1);
+	SparseData rep  = sdata_from_svec(svec);
+	SparseData sdata = concat_replicate(rep, multiplier);
+
+	PG_RETURN_SVECTYPE_P(svec_from_sparsedata(sdata,true));
+}
+
+/**
+ *  svec_concat - concatenates two svecs
+ */
+Datum svec_concat(PG_FUNCTION_ARGS);
+PG_FUNCTION_INFO_V1( svec_concat );
+Datum svec_concat(PG_FUNCTION_ARGS)
+{
+	if (PG_ARGISNULL(0) && (!PG_ARGISNULL(1)))
+                PG_RETURN_SVECTYPE_P(PG_GETARG_SVECTYPE_P(1));
+        else if (PG_ARGISNULL(0) && PG_ARGISNULL(1))
+                PG_RETURN_NULL();
+        else if (PG_ARGISNULL(1))
+                PG_RETURN_SVECTYPE_P(PG_GETARG_SVECTYPE_P(0));
+
+	SvecType *svec1 = PG_GETARG_SVECTYPE_P(0);
+	SvecType *svec2 = PG_GETARG_SVECTYPE_P(1);
+	SparseData left = sdata_from_svec(svec1);
+	SparseData right = sdata_from_svec(svec2);
+	SparseData sdata = concat(left, right);
+
+	PG_RETURN_SVECTYPE_P(svec_from_sparsedata(sdata,true));
+}
+
+/**
+ *  svec_eq - returns the equality of two svecs
+ */
+Datum svec_eq(PG_FUNCTION_ARGS);
+PG_FUNCTION_INFO_V1( svec_eq );
+Datum svec_eq(PG_FUNCTION_ARGS)
+{
+	SvecType *svec1 = PG_GETARG_SVECTYPE_P(0);
+	SvecType *svec2 = PG_GETARG_SVECTYPE_P(1);
+	SparseData left  = sdata_from_svec(svec1);
+	SparseData right = sdata_from_svec(svec2);
+	PG_RETURN_BOOL(sparsedata_eq(left,right));
+}
+
+/*
+ * Svec comparison functions based on the l2 norm
+ */
+static int32_t svec_l2_cmp_internal(SvecType *svec1, SvecType *svec2)
+{
+	SparseData left  = sdata_from_svec(svec1);
+	SparseData right = sdata_from_svec(svec2);
+	double magleft  = l2norm_sdata_values_double(left);
+	double magright = l2norm_sdata_values_double(right);
+	int result;
+
+	if (IS_NVP(magleft) || IS_NVP(magright)) {
+		result = -5;
+		PG_RETURN_INT32(result);
+	}
+
+	if (magleft < magright) result = -1;
+	else if (magleft > magright) result = 1;
+	else result = 0;
+
+	PG_RETURN_INT32(result);
+}
+Datum svec_l2_cmp(PG_FUNCTION_ARGS);
+PG_FUNCTION_INFO_V1( svec_l2_cmp );
+Datum svec_l2_cmp(PG_FUNCTION_ARGS)
+{
+	SvecType *svec1 = PG_GETARG_SVECTYPE_P(0);
+	SvecType *svec2 = PG_GETARG_SVECTYPE_P(1);
+	int result = svec_l2_cmp_internal(svec1,svec2);
+
+	if (result == -5) PG_RETURN_NULL();
+
+	PG_RETURN_INT32(result);
+}
+Datum svec_l2_lt(PG_FUNCTION_ARGS);
+PG_FUNCTION_INFO_V1( svec_l2_lt );
+Datum svec_l2_lt(PG_FUNCTION_ARGS)
+{
+	SvecType *svec1 = PG_GETARG_SVECTYPE_P(0);
+	SvecType *svec2 = PG_GETARG_SVECTYPE_P(1);
+	int result = svec_l2_cmp_internal(svec1,svec2);
+
+	if (result == -5) PG_RETURN_NULL();
+
+	PG_RETURN_BOOL((result == -1) ? 1 : 0);
+}
+Datum svec_l2_le(PG_FUNCTION_ARGS);
+PG_FUNCTION_INFO_V1( svec_l2_le );
+Datum svec_l2_le(PG_FUNCTION_ARGS)
+{
+	SvecType *svec1 = PG_GETARG_SVECTYPE_P(0);
+	SvecType *svec2 = PG_GETARG_SVECTYPE_P(1);
+	int result = svec_l2_cmp_internal(svec1,svec2);
+
+	if (result == -5) PG_RETURN_NULL();
+
+	PG_RETURN_BOOL(((result == -1)||(result == 0)) ? 1 : 0);
+}
+Datum svec_l2_eq(PG_FUNCTION_ARGS);
+PG_FUNCTION_INFO_V1( svec_l2_eq );
+Datum svec_l2_eq(PG_FUNCTION_ARGS)
+{
+	SvecType *svec1 = PG_GETARG_SVECTYPE_P(0);
+	SvecType *svec2 = PG_GETARG_SVECTYPE_P(1);
+	int result = svec_l2_cmp_internal(svec1,svec2);
+
+	if (result == -5) PG_RETURN_NULL();
+
+	PG_RETURN_BOOL((result == 0) ? 1 : 0);
+}
+Datum svec_l2_ne(PG_FUNCTION_ARGS);
+PG_FUNCTION_INFO_V1( svec_l2_ne );
+Datum svec_l2_ne(PG_FUNCTION_ARGS)
+{
+	SvecType *svec1 = PG_GETARG_SVECTYPE_P(0);
+	SvecType *svec2 = PG_GETARG_SVECTYPE_P(1);
+	int result = svec_l2_cmp_internal(svec1,svec2);
+
+	if (result == -5) PG_RETURN_NULL();
+
+	PG_RETURN_BOOL((result != 0) ? 1 : 0);
+}
+Datum svec_l2_gt(PG_FUNCTION_ARGS);
+PG_FUNCTION_INFO_V1( svec_l2_gt );
+Datum svec_l2_gt(PG_FUNCTION_ARGS)
+{
+	SvecType *svec1 = PG_GETARG_SVECTYPE_P(0);
+	SvecType *svec2 = PG_GETARG_SVECTYPE_P(1);
+	int result = svec_l2_cmp_internal(svec1,svec2);
+
+	if (result == -5) PG_RETURN_NULL();
+
+	PG_RETURN_BOOL((result == 1) ? 1 : 0);
+}
+Datum svec_l2_ge(PG_FUNCTION_ARGS);
+PG_FUNCTION_INFO_V1( svec_l2_ge );
+Datum svec_l2_ge(PG_FUNCTION_ARGS)
+{
+	SvecType *svec1 = PG_GETARG_SVECTYPE_P(0);
+	SvecType *svec2 = PG_GETARG_SVECTYPE_P(1);
+	int result = svec_l2_cmp_internal(svec1,svec2);
+
+	if (result == -5) PG_RETURN_NULL();
+
+	PG_RETURN_BOOL(((result == 0) || (result == 1)) ? 1 : 0);
+}
+
+/**
+ * Performs one of subtract, add, multiply, or divide depending on value
+ * of operation.
+ */
+SvecType * svec_operate_on_sdata_pair(int scalar_args, enum operation_t op,
+				      SparseData left, SparseData right)
+{
+	SparseData sdata = NULL;
+	float8 *left_vals = (float8 *)(left->vals->data);
+	float8 *right_vals = (float8 *)(right->vals->data);
+	float8 data_result;
+
+	switch (scalar_args) {
+	case 0: 		//neither arg is scalar
+		sdata = op_sdata_by_sdata(op,left,right);
+		break;
+	case 1:			//left arg is scalar
+		sdata=op_sdata_by_scalar_copy(op,(char *)left_vals,right,false);
+		break;
+	case 2:			//right arg is scalar
+		sdata=op_sdata_by_scalar_copy(op,(char *)right_vals,left,true);
+		break;
+	case 3:			//both args are scalar
+		switch (op) {
+		case subtract:
+			data_result = left_vals[0] - right_vals[0];
+			break;
+		case add:
+		default:
+			data_result = left_vals[0] + right_vals[0];
+			break;
+		case multiply:
+			data_result = left_vals[0] * right_vals[0];
+			break;
+		case divide:
+			data_result = left_vals[0] / right_vals[0];
+			break;
+		}
+		return svec_make_scalar(data_result);
+		break;
+	}
+	return svec_from_sparsedata(sdata,true);
+}
+
+
+SvecType * op_svec_by_svec_internal(enum operation_t op, SvecType *svec1, SvecType *svec2)
+{
+	SparseData left  = sdata_from_svec(svec1);
+	SparseData right = sdata_from_svec(svec2);
+
+	int scalar_args = check_scalar(IS_SCALAR(svec1),IS_SCALAR(svec2));
+
+	return svec_operate_on_sdata_pair(scalar_args,op,left,right);
+}
+
+/*
+ * Do exponentiation, only makes sense if the left is a vector and the right
+ * is a scalar or if both are scalar
+ */
+static SvecType *
+pow_svec_by_scalar_internal(SvecType *svec1, SvecType *svec2)
+{
+	SparseData left  = sdata_from_svec(svec1);
+	SparseData right = sdata_from_svec(svec2);
+	SparseData sdata = NULL;
+	double *left_vals=(double *)(left->vals->data);
+	double *right_vals=(double *)(right->vals->data);
+	double data_result;
+
+	int scalar_args = check_scalar(IS_SCALAR(svec1),IS_SCALAR(svec2));
+
+	switch(scalar_args) {
+	case 0: 		//neither arg is scalar
+	case 1:			//left arg is scalar
+		ereport(ERROR,
+			(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+			 errmsg("Svec exponentiation is undefined when the right argument is a vector")));
+		break;
+	case 2:			//right arg is scalar
+		if (right_vals[0] == 2.) // the squared case
+		{
+			sdata = square_sdata(left);
+		} else if (right_vals[0] == 3.) // the cubed case
+		{
+			sdata = cube_sdata(left);
+		}  else if (right_vals[0] == 4.) // the quad case
+		{
+			sdata = quad_sdata(left);
+		} else {
+			sdata = pow_sdata_by_scalar(left,(char *)right_vals);
+		}
+		break;
+	case 3:			//both args are scalar
+		data_result = pow(left_vals[0],right_vals[0]);
+		return svec_make_scalar(data_result);
+		break;
+	}
+	return svec_from_sparsedata(sdata,true);
+}
+
+PG_FUNCTION_INFO_V1( svec_pow );
+Datum svec_pow(PG_FUNCTION_ARGS)
+{
+	SvecType *svec1 = PG_GETARG_SVECTYPE_P(0);
+	SvecType *svec2 = PG_GETARG_SVECTYPE_P(1);
+	check_dimension(svec1,svec2,"svec_pow");
+	SvecType *result = pow_svec_by_scalar_internal(svec1,svec2);
+	PG_RETURN_SVECTYPE_P(result);
+}
+
+PG_FUNCTION_INFO_V1( svec_minus );
+Datum svec_minus(PG_FUNCTION_ARGS)
+{
+	SvecType *svec1 = PG_GETARG_SVECTYPE_P(0);
+	SvecType *svec2 = PG_GETARG_SVECTYPE_P(1);
+	check_dimension(svec1,svec2,"svec_minus");
+	SvecType *result = op_svec_by_svec_internal(subtract,svec1,svec2);
+	PG_RETURN_SVECTYPE_P(result);
+}
+
+PG_FUNCTION_INFO_V1( svec_plus );
+Datum svec_plus(PG_FUNCTION_ARGS)
+{
+	SvecType *svec1 = PG_GETARG_SVECTYPE_P(0);
+	SvecType *svec2 = PG_GETARG_SVECTYPE_P(1);
+	check_dimension(svec1,svec2,"svec_plus");
+	SvecType *result = op_svec_by_svec_internal(add,svec1,svec2);
+	PG_RETURN_SVECTYPE_P(result);
+}
+
+PG_FUNCTION_INFO_V1( svec_mult );
+Datum svec_mult(PG_FUNCTION_ARGS)
+{
+	SvecType *svec1 = PG_GETARG_SVECTYPE_P(0);
+	SvecType *svec2 = PG_GETARG_SVECTYPE_P(1);
+	check_dimension(svec1,svec2,"svec_mult");
+	SvecType *result = op_svec_by_svec_internal(multiply,svec1,svec2);
+	PG_RETURN_SVECTYPE_P(result);
+}
+
+PG_FUNCTION_INFO_V1( svec_div );
+Datum svec_div(PG_FUNCTION_ARGS)
+{
+	SvecType *svec1 = PG_GETARG_SVECTYPE_P(0);
+	SvecType *svec2 = PG_GETARG_SVECTYPE_P(1);
+	check_dimension(svec1,svec2,"svec_div");
+	SvecType *result = op_svec_by_svec_internal(divide,svec1,svec2);
+	PG_RETURN_SVECTYPE_P(result);
+}
+
+PG_FUNCTION_INFO_V1( svec_dot );
+/**
+ *  svec_dot - computes the dot product of two svecs
+ */
+Datum svec_dot(PG_FUNCTION_ARGS)
+{
+	SvecType *svec1 = PG_GETARG_SVECTYPE_P(0);
+	SvecType *svec2 = PG_GETARG_SVECTYPE_P(1);
+
+	double accum = svec_svec_dot_product( svec1, svec2);
+
+	if (IS_NVP(accum)) PG_RETURN_NULL();
+
+	PG_RETURN_FLOAT8(accum);
+}
+
+/*
+ * Cast from int2,int4,int8,float4,float8 scalar to SvecType
+ */
+PG_FUNCTION_INFO_V1( svec_cast_int2 );
+Datum svec_cast_int2(PG_FUNCTION_ARGS) {
+	float8 value=(float8 )PG_GETARG_INT16(0);
+	PG_RETURN_SVECTYPE_P(svec_make_scalar(value));
+}
+PG_FUNCTION_INFO_V1( svec_cast_int4 );
+Datum svec_cast_int4(PG_FUNCTION_ARGS) {
+	float8 value=(float8 )PG_GETARG_INT32(0);
+	PG_RETURN_SVECTYPE_P(svec_make_scalar(value));
+}
+PG_FUNCTION_INFO_V1( svec_cast_int8 );
+Datum svec_cast_int8(PG_FUNCTION_ARGS) {
+	float8 value=(float8 )PG_GETARG_INT64(0);
+	PG_RETURN_SVECTYPE_P(svec_make_scalar(value));
+}
+PG_FUNCTION_INFO_V1( svec_cast_float4 );
+Datum svec_cast_float4(PG_FUNCTION_ARGS) {
+	float8 value=(float8 )PG_GETARG_FLOAT4(0);
+	PG_RETURN_SVECTYPE_P(svec_make_scalar(value));
+}
+PG_FUNCTION_INFO_V1( svec_cast_float8 );
+Datum svec_cast_float8(PG_FUNCTION_ARGS) {
+	float8 value=PG_GETARG_FLOAT8(0);
+	PG_RETURN_SVECTYPE_P(svec_make_scalar(value));
+}
+PG_FUNCTION_INFO_V1( svec_cast_numeric );
+Datum svec_cast_numeric(PG_FUNCTION_ARGS) {
+	Datum num=PG_GETARG_DATUM(0);
+	float8 value;
+	value = DatumGetFloat8(DirectFunctionCall1(numeric_float8_no_overflow,num));
+	PG_RETURN_SVECTYPE_P(svec_make_scalar(value));
+}
+
+/*
+ * Cast from int2,int4,int8,float4,float8 scalar to float8[]
+ */
+PG_FUNCTION_INFO_V1( float8arr_cast_int2 );
+Datum float8arr_cast_int2(PG_FUNCTION_ARGS) {
+	float8 value=(float8 )PG_GETARG_INT16(0);
+	PG_RETURN_ARRAYTYPE_P(svec_return_array_internal(svec_make_scalar(value)));
+}
+PG_FUNCTION_INFO_V1( float8arr_cast_int4 );
+Datum float8arr_cast_int4(PG_FUNCTION_ARGS) {
+	float8 value=(float8 )PG_GETARG_INT32(0);
+	PG_RETURN_ARRAYTYPE_P(svec_return_array_internal(svec_make_scalar(value)));
+}
+PG_FUNCTION_INFO_V1( float8arr_cast_int8 );
+Datum float8arr_cast_int8(PG_FUNCTION_ARGS) {
+	float8 value=(float8 )PG_GETARG_INT64(0);
+	PG_RETURN_ARRAYTYPE_P(svec_return_array_internal(svec_make_scalar(value)));
+}
+PG_FUNCTION_INFO_V1( float8arr_cast_float4 );
+Datum float8arr_cast_float4(PG_FUNCTION_ARGS) {
+	float8 value=(float8 )PG_GETARG_FLOAT4(0);
+	PG_RETURN_ARRAYTYPE_P(svec_return_array_internal(svec_make_scalar(value)));
+}
+PG_FUNCTION_INFO_V1( float8arr_cast_float8 );
+Datum float8arr_cast_float8(PG_FUNCTION_ARGS) {
+	float8 value=PG_GETARG_FLOAT8(0);
+	PG_RETURN_ARRAYTYPE_P(svec_return_array_internal(svec_make_scalar(value)));
+}
+PG_FUNCTION_INFO_V1( float8arr_cast_numeric );
+Datum float8arr_cast_numeric(PG_FUNCTION_ARGS) {
+	Datum num=PG_GETARG_DATUM(0);
+	float8 value;
+	value = DatumGetFloat8(DirectFunctionCall1(numeric_float8_no_overflow,num));
+	PG_RETURN_ARRAYTYPE_P(svec_return_array_internal(svec_make_scalar(value)));
+}
+
+/** Constructs an 1-dimensional svec given a float8 */
+SvecType *svec_make_scalar(float8 value) {
+	SparseData sdata = float8arr_to_sdata(&value,1);
+	SvecType *result = svec_from_sparsedata(sdata,true);
+	result->dimension = -1;
+	return result;
+}
+
+
+PG_FUNCTION_INFO_V1( svec_cast_float8arr );
+/**
+ *  svec_cast_float8arr - turns a float8 array into an svec
+ */
+Datum svec_cast_float8arr(PG_FUNCTION_ARGS) {
+	ArrayType *A_PG = PG_GETARG_ARRAYTYPE_P(0);
+	SvecType *output_svec;
+	float8 *array_temp;
+	bits8 *bitmap;
+	int bitmask;
+	int i,j;
+
+	if (ARR_ELEMTYPE(A_PG) != FLOAT8OID)
+		ereport(ERROR,
+			(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+			 errmsg("svec_cast_float8arr only defined over float8[]")));
+	if (ARR_NDIM(A_PG) != 1)
+		ereport(ERROR,
+			(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+			 errmsg("svec_cast_float8arr only defined over 1 dimensional arrays")));
+
+	/* Extract array */
+	int dimension = ARR_DIMS(A_PG)[0];
+	float8 *array = (float8 *)ARR_DATA_PTR(A_PG);
+
+	/* If the data array has NULLs, then we need to create an array to
+	 * store the NULL values as NVP values defined in float_specials.h.
+	 */
+	if (ARR_HASNULL(A_PG)) {
+		array_temp = array;
+		array = (double *)palloc(sizeof(float8) * dimension);
+		bitmap = ARR_NULLBITMAP(A_PG);
+		bitmask = 1;
+		j = 0;
+		for (i=0; i<dimension; i++) {
+			if (bitmap && (*bitmap & bitmask) == 0) // if NULL
+				array[i] = NVP;
+			else {
+				array[i] = array_temp[j];
+				j++;
+			}
+			if (bitmap) { // advance bitmap pointer
+				bitmask <<= 1;
+				if (bitmask == 0x100) {
+					bitmap++;
+					bitmask = 1;
+				}
+			}
+		}
+	 }
+
+	/* Create the output SVEC */
+	SparseData sdata = float8arr_to_sdata(array,dimension);
+	output_svec = svec_from_sparsedata(sdata,true);
+
+	if (ARR_HASNULL(A_PG))
+		pfree(array);
+
+	PG_RETURN_SVECTYPE_P(output_svec);
+}
+
+PG_FUNCTION_INFO_V1( svec_cast_positions_float8arr );
+/**
+ *  svec_cast_positions_float8arr - turns a pair of arrays, the first an int4[]
+ *    denoting positions and the second a float8[] denoting values, into an
+ *    svec of a given size with a given default value everywhere else.
+ */
+Datum svec_cast_positions_float8arr(PG_FUNCTION_ARGS) {
+	ArrayType *B_PG = PG_GETARG_ARRAYTYPE_P(0);
+	ArrayType *A_PG = PG_GETARG_ARRAYTYPE_P(1);
+	int64 size = PG_GETARG_INT64(2);
+	float8 base_value = PG_GETARG_FLOAT8(3);
+	SvecType *output_svec;
+	int i = 0;
+
+	if (ARR_ELEMTYPE(A_PG) != FLOAT8OID)
+		ereport(ERROR,
+			(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+			 errmsg("svec_cast_positions_float8arr valeus only defined over float8[]")));
+	if (ARR_NDIM(A_PG) != 1)
+		ereport(ERROR,
+			(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+			 errmsg("svec_cast_positions_float8arr only defined over 1 dimensional arrays")));
+
+	if (ARR_NULLBITMAP(A_PG))
+		ereport(ERROR,
+			(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+			 errmsg("svec_cast_positions_float8arr does not allow null bitmaps on arrays")));
+
+	if (ARR_ELEMTYPE(B_PG) != INT8OID)
+		ereport(ERROR,
+			(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+			 errmsg("svec_cast_positions_float8arr positions only defined over int[]")));
+	if (ARR_NDIM(B_PG) != 1)
+		ereport(ERROR,
+			(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+			 errmsg("svec_cast_positions_float8arr only defined over 1 dimensional arrays")));
+
+	if (ARR_NULLBITMAP(B_PG))
+		ereport(ERROR,
+			(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+			 errmsg("svec_cast_positions_float8arr does not allow null bitmaps on arrays")));
+
+	/* Extract array */
+	int dimension = ARR_DIMS(A_PG)[0];
+	int dimension2 = ARR_DIMS(B_PG)[0];
+
+	if (dimension != dimension2)
+		ereport(ERROR,
+			(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+			 errmsg("svec_cast_positions_float8arr position and value vectors must be of the same size")));
+
+	float8 *array = (float8 *)ARR_DATA_PTR(A_PG);
+	int64 *array_pos =  (int64 *)ARR_DATA_PTR(B_PG);
+
+	if (array_pos[dimension-1] > size)
+		ereport(ERROR,
+			(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+			 errmsg("svec_cast_positions_float8arr some of the position values are larger than maximum array size declared")));
+
+	for(i=0;i < dimension;++i){
+		if(array_pos[i] <= 0){
+			ereport(ERROR,
+				(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+				 errmsg("svec_cast_positions_float8arr only accepts position that are positive integers (x > 0)")));
+		}
+	}
+
+	/* Create the output SVEC */
+	SparseData sdata = position_to_sdata(array,array_pos,FLOAT8OID,dimension,size,base_value);
+	output_svec = svec_from_sparsedata(sdata,true);
+
+	PG_RETURN_SVECTYPE_P(output_svec);
+}
+
+/*
+ * Provide some operators for Postgres FLOAT8OID arrays
+ */
+/*
+ * Equality
+ */
+static bool float8arr_equals_internal(ArrayType *left, ArrayType *right)
+{
+	/*
+	 * Note that we are only defined for FLOAT8OID
+	 */
+        int dimleft = ARR_NDIM(left), dimright = ARR_NDIM(right);
+        int *dimsleft = ARR_DIMS(left), *dimsright = ARR_DIMS(right);
+	int numleft = ArrayGetNItems(dimleft,dimsleft);
+	int numright = ArrayGetNItems(dimright,dimsright);
+        double *vals_left = (double *)ARR_DATA_PTR(left);
+	double *vals_right = (double *)ARR_DATA_PTR(right);
+        bits8 *bitmap_left = ARR_NULLBITMAP(left);
+	bits8 *bitmap_right = ARR_NULLBITMAP(right);
+        int bitmask = 1;
+
+        if ((dimsleft!=dimsright) || (numleft!=numright))
+		return false;
+
+	/*
+	 * First we'll check to see if the null bitmaps are equivalent
+	 */
+	if (bitmap_left)
+		if (! bitmap_right) return false;
+	if (bitmap_right)
+		if (! bitmap_left) return false;
+
+	if (bitmap_left)
+	{
+        	for (int i=0; i<numleft; i++)
+		{
+                	if ((*bitmap_left & bitmask) == 0)
+                		if ((*bitmap_left & bitmask) != 0)
+			  		return false;
+                        bitmask <<= 1;
+                        if (bitmask == 0x100)
+                        {
+                                bitmap_left++;
+                                bitmask = 1;
+                        }
+		}
+	}
+
+	/*
+	 * Now we check for equality of all array values
+	 */
+       	for (int i=0; i<numleft; i++)
+		if (vals_left[i] != vals_right[i]) return false;
+
+        return true;
+}
+
+/**
+ *  float8arr_equals - checks whether two float8 arrays are identical
+ */
+Datum float8arr_equals(PG_FUNCTION_ARGS);
+PG_FUNCTION_INFO_V1( float8arr_equals);
+Datum float8arr_equals(PG_FUNCTION_ARGS) {
+	ArrayType *left  = PG_GETARG_ARRAYTYPE_P(0);
+	ArrayType *right = PG_GETARG_ARRAYTYPE_P(1);
+
+	PG_RETURN_BOOL(float8arr_equals_internal(left,right));
+}
+
+/*
+ * Returns a SparseData formed from a dense float8[] in uncompressed format.
+ * This is useful for creating a SparseData without processing that can be
+ * used by the SparseData processing routines.
+ */
+SparseData sdata_uncompressed_from_float8arr_internal(ArrayType *array)
+{
+        int dim = ARR_NDIM(array);
+        int *dims = ARR_DIMS(array);
+	int num = ArrayGetNItems(dim,dims);
+        double *vals =(double *)ARR_DATA_PTR(array);
+        bits8 *bitmap = ARR_NULLBITMAP(array);
+        int   bitmask=1;
+
+	/* Convert null items into NVPs */
+	if (bitmap)
+	{
+		int j = 0;
+		double *vals_temp = vals;
+		vals = (double *)palloc(sizeof(float8) * num);
+        	for (int i=0; i<num; i++)
+		{
+                	if ((*bitmap & bitmask) == 0) // if NULL
+				vals[i] = NVP;
+			else {
+				vals[i] = vals_temp[j];
+				j++;
+			}
+                        bitmask <<= 1;
+                        if (bitmask == 0x100)
+                        {
+                                bitmap++;
+                                bitmask = 1;
+                        }
+		}
+	}
+	/* Makes the SparseData; this relies on using NULL to represent a
+	 * count array of ones, as described in SparseData.h, after definition
+	 * of SparseDataStruct.
+	 */
+	SparseData result = makeInplaceSparseData(
+				 (char *)vals,NULL,
+				 num*sizeof(float8),0,FLOAT8OID,num,num);
+
+	return(result);
+}
+
+/**
+ *  float8arr_dot - computes the dot product of two float8 arrays
+ */
+Datum float8arr_dot(PG_FUNCTION_ARGS);
+PG_FUNCTION_INFO_V1( float8arr_dot);
+Datum float8arr_dot(PG_FUNCTION_ARGS) {
+	ArrayType *arr_left   = PG_GETARG_ARRAYTYPE_P(0);
+	ArrayType *arr_right  = PG_GETARG_ARRAYTYPE_P(1);
+	SparseData left  = sdata_uncompressed_from_float8arr_internal(arr_left);
+	SparseData right = sdata_uncompressed_from_float8arr_internal(arr_right);
+	SparseData mult_result;
+	double accum;
+
+	mult_result = op_sdata_by_sdata(multiply,left,right);
+	accum = sum_sdata_values_double(mult_result);
+	freeSparseData(left);
+	freeSparseData(right);
+	freeSparseDataAndData(mult_result);
+
+	if (IS_NVP(accum)) PG_RETURN_NULL();
+
+	PG_RETURN_FLOAT8(accum);
+}
+
+/*
+ * Permute the basic operators (minus,plus,mult,div) between SparseData
+ * and float8[]
+ *
+ * For each function, make a version that takes the left and right args as
+ * each type (without copies)
+ */
+PG_FUNCTION_INFO_V1( float8arr_minus_float8arr );
+Datum
+float8arr_minus_float8arr(PG_FUNCTION_ARGS)
+{
+	ArrayType *arr1 = PG_GETARG_ARRAYTYPE_P(0);
+	ArrayType *arr2 = PG_GETARG_ARRAYTYPE_P(1);
+	SparseData left  = sdata_uncompressed_from_float8arr_internal(arr1);
+	SparseData right = sdata_uncompressed_from_float8arr_internal(arr2);
+	int scalar_args = check_scalar(SDATA_IS_SCALAR(left),SDATA_IS_SCALAR(right));
+	PG_RETURN_SVECTYPE_P(svec_operate_on_sdata_pair(scalar_args,subtract,left,right));
+}
+PG_FUNCTION_INFO_V1( svec_minus_float8arr );
+Datum
+svec_minus_float8arr(PG_FUNCTION_ARGS)
+{
+	SvecType *svec = PG_GETARG_SVECTYPE_P(0);
+	ArrayType *arr = PG_GETARG_ARRAYTYPE_P(1);
+	SparseData left = sdata_from_svec(svec);
+	SparseData right = sdata_uncompressed_from_float8arr_internal(arr);
+	int scalar_args = check_scalar(SDATA_IS_SCALAR(left),SDATA_IS_SCALAR(right));
+	PG_RETURN_SVECTYPE_P(svec_operate_on_sdata_pair(scalar_args,subtract,left,right));
+}
+PG_FUNCTION_INFO_V1( float8arr_minus_svec );
+Datum
+float8arr_minus_svec(PG_FUNCTION_ARGS)
+{
+	ArrayType *arr = PG_GETARG_ARRAYTYPE_P(0);
+	SvecType *svec = PG_GETARG_SVECTYPE_P(1);
+	SparseData left = sdata_uncompressed_from_float8arr_internal(arr);
+	SparseData right = sdata_from_svec(svec);
+	int scalar_args = check_scalar(SDATA_IS_SCALAR(left),SDATA_IS_SCALAR(right));
+	PG_RETURN_SVECTYPE_P(svec_operate_on_sdata_pair(scalar_args,subtract,left,right));
+}
+
+PG_FUNCTION_INFO_V1( float8arr_plus_float8arr );
+Datum
+float8arr_plus_float8arr(PG_FUNCTION_ARGS)
+{
+	ArrayType *arr1 = PG_GETARG_ARRAYTYPE_P(0);
+	ArrayType *arr2 = PG_GETARG_ARRAYTYPE_P(1);
+	SparseData left  = sdata_uncompressed_from_float8arr_internal(arr1);
+	SparseData right = sdata_uncompressed_from_float8arr_internal(arr2);
+	int scalar_args = check_scalar(SDATA_IS_SCALAR(left),SDATA_IS_SCALAR(right));
+	PG_RETURN_SVECTYPE_P(svec_operate_on_sdata_pair(scalar_args,add,left,right));
+}
+PG_FUNCTION_INFO_V1( svec_plus_float8arr );
+Datum
+svec_plus_float8arr(PG_FUNCTION_ARGS)
+{
+	SvecType *svec = PG_GETARG_SVECTYPE_P(0);
+	ArrayType *arr = PG_GETARG_ARRAYTYPE_P(1);
+	SparseData left = sdata_from_svec(svec);
+	SparseData right = sdata_uncompressed_from_float8arr_internal(arr);
+	int scalar_args = check_scalar(SDATA_IS_SCALAR(left),SDATA_IS_SCALAR(right));
+	PG_RETURN_SVECTYPE_P(svec_operate_on_sdata_pair(scalar_args,add,left,right));
+}
+PG_FUNCTION_INFO_V1( float8arr_plus_svec );
+Datum
+float8arr_plus_svec(PG_FUNCTION_ARGS)
+{
+	ArrayType *arr = PG_GETARG_ARRAYTYPE_P(0);
+	SvecType *svec = PG_GETARG_SVECTYPE_P(1);
+	SparseData left = sdata_uncompressed_from_float8arr_internal(arr);
+	SparseData right = sdata_from_svec(svec);
+	int scalar_args = check_scalar(SDATA_IS_SCALAR(left),SDATA_IS_SCALAR(right));
+	PG_RETURN_SVECTYPE_P(svec_operate_on_sdata_pair(scalar_args,add,left,right));
+}
+PG_FUNCTION_INFO_V1( float8arr_mult_float8arr );
+Datum
+float8arr_mult_float8arr(PG_FUNCTION_ARGS)
+{
+	ArrayType *arr1 = PG_GETARG_ARRAYTYPE_P(0);
+	ArrayType *arr2 = PG_GETARG_ARRAYTYPE_P(1);
+	SparseData left  = sdata_uncompressed_from_float8arr_internal(arr1);
+	SparseData right = sdata_uncompressed_from_float8arr_internal(arr2);
+	int scalar_args = check_scalar(SDATA_IS_SCALAR(left),SDATA_IS_SCALAR(right));
+	SvecType *svec = svec_operate_on_sdata_pair(scalar_args,multiply,left,right);
+	PG_RETURN_SVECTYPE_P(svec);
+}
+PG_FUNCTION_INFO_V1( svec_mult_float8arr );
+Datum
+svec_mult_float8arr(PG_FUNCTION_ARGS)
+{
+	SvecType *svec = PG_GETARG_SVECTYPE_P(0);
+	ArrayType *arr = PG_GETARG_ARRAYTYPE_P(1);
+	SparseData left = sdata_from_svec(svec);
+	SparseData right = sdata_uncompressed_from_float8arr_internal(arr);
+	int scalar_args = check_scalar(SDATA_IS_SCALAR(left),SDATA_IS_SCALAR(right));
+	SvecType *result = svec_operate_on_sdata_pair(scalar_args,multiply,left,right);
+	PG_RETURN_SVECTYPE_P(result);
+}
+PG_FUNCTION_INFO_V1( float8arr_mult_svec );
+Datum
+float8arr_mult_svec(PG_FUNCTION_ARGS)
+{
+	ArrayType *arr = PG_GETARG_ARRAYTYPE_P(0);
+	SvecType *svec = PG_GETARG_SVECTYPE_P(1);
+	SparseData left = sdata_uncompressed_from_float8arr_internal(arr);
+	SparseData right = sdata_from_svec(svec);
+	int scalar_args = check_scalar(SDATA_IS_SCALAR(left),SDATA_IS_SCALAR(right));
+	PG_RETURN_SVECTYPE_P(svec_operate_on_sdata_pair(scalar_args,multiply,left,right));
+}
+PG_FUNCTION_INFO_V1( float8arr_div_float8arr );
+Datum
+float8arr_div_float8arr(PG_FUNCTION_ARGS)
+{
+	ArrayType *arr1 = PG_GETARG_ARRAYTYPE_P(0);
+	ArrayType *arr2 = PG_GETARG_ARRAYTYPE_P(1);
+	SparseData left  = sdata_uncompressed_from_float8arr_internal(arr1);
+	SparseData right = sdata_uncompressed_from_float8arr_internal(arr2);
+	int scalar_args = check_scalar(SDATA_IS_SCALAR(left),SDATA_IS_SCALAR(right));
+	PG_RETURN_SVECTYPE_P(svec_operate_on_sdata_pair(scalar_args,divide,left,right));
+}
+PG_FUNCTION_INFO_V1( svec_div_float8arr );
+Datum
+svec_div_float8arr(PG_FUNCTION_ARGS)
+{
+	SvecType *svec = PG_GETARG_SVECTYPE_P(0);
+	ArrayType *arr = PG_GETARG_ARRAYTYPE_P(1);
+	SparseData left = sdata_from_svec(svec);
+	SparseData right = sdata_uncompressed_from_float8arr_internal(arr);
+	int scalar_args = check_scalar(SDATA_IS_SCALAR(left),SDATA_IS_SCALAR(right));
+	PG_RETURN_SVECTYPE_P(svec_operate_on_sdata_pair(scalar_args,divide,left,right));
+}
+PG_FUNCTION_INFO_V1( float8arr_div_svec );
+Datum
+float8arr_div_svec(PG_FUNCTION_ARGS)
+{
+	ArrayType *arr = PG_GETARG_ARRAYTYPE_P(0);
+	SvecType *svec = PG_GETARG_SVECTYPE_P(1);
+	SparseData left = sdata_uncompressed_from_float8arr_internal(arr);
+	SparseData right = sdata_from_svec(svec);
+	int scalar_args = check_scalar(SDATA_IS_SCALAR(left),SDATA_IS_SCALAR(right));
+	PG_RETURN_SVECTYPE_P(svec_operate_on_sdata_pair(scalar_args,divide,left,right));
+}
+PG_FUNCTION_INFO_V1( svec_dot_float8arr );
+Datum
+svec_dot_float8arr(PG_FUNCTION_ARGS)
+{
+	SvecType *svec = PG_GETARG_SVECTYPE_P(0);
+	ArrayType *arr = PG_GETARG_ARRAYTYPE_P(1);
+	SparseData right = sdata_uncompressed_from_float8arr_internal(arr);
+	SparseData left = sdata_from_svec(svec);
+	SparseData mult_result;
+	double accum;
+	mult_result = op_sdata_by_sdata(multiply,left,right);
+	accum = sum_sdata_values_double(mult_result);
+	freeSparseData(right);
+	freeSparseDataAndData(mult_result);
+
+	if (IS_NVP(accum)) PG_RETURN_NULL();
+
+	PG_RETURN_FLOAT8(accum);
+}
+PG_FUNCTION_INFO_V1( float8arr_dot_svec);
+Datum
+float8arr_dot_svec(PG_FUNCTION_ARGS)
+{
+	ArrayType *arr = PG_GETARG_ARRAYTYPE_P(0);
+	SvecType *svec = PG_GETARG_SVECTYPE_P(1);
+	SparseData left = sdata_uncompressed_from_float8arr_internal(arr);
+	SparseData right = sdata_from_svec(svec);
+	SparseData mult_result;
+	double accum;
+	mult_result = op_sdata_by_sdata(multiply,left,right);
+	accum = sum_sdata_values_double(mult_result);
+	freeSparseData(left);
+	freeSparseDataAndData(mult_result);
+
+	if (IS_NVP(accum)) PG_RETURN_NULL();
+
+	PG_RETURN_FLOAT8(accum);
+}

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/8b26974c/contrib/gp_sparse_vector/sfv_test_output
----------------------------------------------------------------------
diff --git a/contrib/gp_sparse_vector/sfv_test_output b/contrib/gp_sparse_vector/sfv_test_output
new file mode 100644
index 0000000..7875189
--- /dev/null
+++ b/contrib/gp_sparse_vector/sfv_test_output
@@ -0,0 +1,141 @@
+-- \set FETCH_COUNT 100
+\timing
+Timing is on.
+DROP TABLE IF EXISTS features;
+DROP TABLE
+Time: 33.300 ms
+DROP TABLE IF EXISTS corpus;
+DROP TABLE
+Time: 10.930 ms
+DROP TABLE IF EXISTS documents;
+DROP TABLE
+Time: 11.157 ms
+DROP TABLE IF EXISTS dictionary;
+DROP TABLE
+Time: 0.556 ms
+-- Test simple document classIFication routines
+CREATE TABLE features (a text[][]) DISTRIBUTED RANDOMLY;
+CREATE TABLE
+Time: 23.411 ms
+INSERT INTO features values ('{am,before,being,bothered,corpus,document,i,in,is,me,never,now,one,really,second,the,third,this,until}');
+INSERT 0 1
+Time: 58.249 ms
+DROP TABLE IF EXISTS documents;
+DROP TABLE
+Time: 0.581 ms
+CREATE TABLE documents (docnum int, a text[]) DISTRIBUTED RANDOMLY;
+CREATE TABLE
+Time: 18.009 ms
+INSERT INTO documents values (1,'{this,is,one,document,in,the,corpus}');
+INSERT 0 1
+Time: 92.216 ms
+INSERT INTO documents values (2,'{i,am,the,second,document,in,the,corpus}');
+INSERT 0 1
+Time: 5.971 ms
+INSERT INTO documents values (3,'{being,third,never,really,bothered,me,until,now}');
+INSERT 0 1
+Time: 5.199 ms
+INSERT INTO documents values (4,'{the,document,before,me,is,the,third,document}');
+INSERT 0 1
+Time: 6.822 ms
+CREATE TABLE corpus (docnum int, a svec) DISTRIBUTED RANDOMLY;
+CREATE TABLE
+Time: 17.750 ms
+INSERT INTO corpus (SELECT docnum,gp_extract_feature_histogram((SELECT a FROM features LIMIT 1),a) FROM documents);
+INSERT 0 4
+Time: 77.271 ms
+\qecho Show the feature dictionary
+Show the feature dictionary
+SELECT a dictionary FROM features;
+                                               dictionary                                               
+--------------------------------------------------------------------------------------------------------
+ {am,before,being,bothered,corpus,document,i,in,is,me,never,now,one,really,second,the,third,this,until}
+(1 row)
+
+Time: 2.450 ms
+\qecho Show each document
+Show each document
+SELECT docnum Document_Number, a document FROM documents ORDER BY 1;
+ document_number |                     document                     
+-----------------+--------------------------------------------------
+               1 | {this,is,one,document,in,the,corpus}
+               2 | {i,am,the,second,document,in,the,corpus}
+               3 | {being,third,never,really,bothered,me,until,now}
+               4 | {the,document,before,me,is,the,third,document}
+(4 rows)
+
+Time: 1.895 ms
+\qecho The extracted feature vector for each document
+The extracted feature vector for each document
+SELECT docnum Document_Number, a::float8[] feature_vector FROM corpus ORDER BY 1;
+ document_number |             feature_vector              
+-----------------+-----------------------------------------
+               1 | {0,0,0,0,1,1,0,1,1,0,0,0,1,0,0,1,0,1,0}
+               2 | {1,0,0,0,1,1,1,1,0,0,0,0,0,0,1,2,0,0,0}
+               3 | {0,0,1,1,0,0,0,0,0,1,1,1,0,1,0,0,1,0,1}
+               4 | {0,1,0,0,0,2,0,0,1,1,0,0,0,0,0,2,1,0,0}
+(4 rows)
+
+Time: 2.378 ms
+\qecho Count the number of times each feature occurs at least once in all documents
+Count the number of times each feature occurs at least once in all documents
+SELECT (vec_count_nonzero(a))::float8[] count_in_document FROM corpus;
+            count_in_document            
+-----------------------------------------
+ {1,1,1,1,2,3,1,2,2,2,1,1,1,1,1,3,2,1,1}
+(1 row)
+
+Time: 2.600 ms
+\qecho Count all occurrences of each term in all documents
+Count all occurrences of each term in all documents
+SELECT (sum(a))::float8[] sum_in_document FROM corpus;
+             sum_in_document             
+-----------------------------------------
+ {1,1,1,1,2,4,1,2,2,2,1,1,1,1,1,5,2,1,1}
+(1 row)
+
+Time: 2.367 ms
+\qecho Calculate Term Frequency / Inverse Document Frequency
+Calculate Term Frequency / Inverse Document Frequency
+SELECT docnum, (a*logidf)::float8[] tf_idf FROM (SELECT log(count(a)/vec_count_nonzero(a)) logidf FROM corpus) foo, corpus ORDER BY docnum;
+ docnum |                                                                              tf_idf                                                                               
+--------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------
+      1 | {0,0,0,0,0.693147180559945,0.287682072451781,0,0.693147180559945,0.693147180559945,0,0,0,1.38629436111989,0,0,0.287682072451781,0,1.38629436111989,0}
+      2 | {1.38629436111989,0,0,0,0.693147180559945,0.287682072451781,1.38629436111989,0.693147180559945,0,0,0,0,0,0,1.38629436111989,0.575364144903562,0,0,0}
+      3 | {0,0,1.38629436111989,1.38629436111989,0,0,0,0,0,0.693147180559945,1.38629436111989,1.38629436111989,0,1.38629436111989,0,0,0.693147180559945,0,1.38629436111989}
+      4 | {0,1.38629436111989,0,0,0,0.575364144903562,0,0,0.693147180559945,0.693147180559945,0,0,0,0,0,0.575364144903562,0.693147180559945,0,0}
+(4 rows)
+
+Time: 13.013 ms
+\qecho Show the same calculation in compressed vector format
+Show the same calculation in compressed vector format
+SELECT docnum, (a*logidf) tf_idf FROM (SELECT log(count(a)/vec_count_nonzero(a)) logidf FROM corpus) foo, corpus ORDER BY docnum;
+ docnum |                                                                          tf_idf                                                                          
+--------+----------------------------------------------------------------------------------------------------------------------------------------------------------
+      1 | {4,1,1,1,2,3,1,2,1,1,1,1}:{0,0.693147180559945,0.287682072451781,0,0.693147180559945,0,1.38629436111989,0,0.287682072451781,0,1.38629436111989,0}
+      2 | {1,3,1,1,1,1,6,1,1,3}:{1.38629436111989,0,0.693147180559945,0.287682072451781,1.38629436111989,0.693147180559945,0,1.38629436111989,0.575364144903562,0}
+      3 | {2,2,5,1,2,1,1,2,1,1,1}:{0,1.38629436111989,0,0.693147180559945,1.38629436111989,0,1.38629436111989,0,0.693147180559945,0,1.38629436111989}
+      4 | {1,1,3,1,2,2,5,1,1,2}:{0,1.38629436111989,0,0.575364144903562,0,0.693147180559945,0,0.575364144903562,0.693147180559945,0}
+(4 rows)
+
+Time: 5.794 ms
+\qecho Create a table with TF / IDF weighted vectors in it
+Create a table with TF / IDF weighted vectors in it
+DROP TABLE IF EXISTS WEIGHTS;
+DROP TABLE
+Time: 11.575 ms
+CREATE TABLE weights AS (SELECT docnum, (a*logidf) tf_idf FROM (SELECT log(count(a)/vec_count_nonzero(a)) logidf FROM corpus) foo, corpus ORDER BY docnum) DISTRIBUTED RANDOMLY;
+SELECT 4
+Time: 101.241 ms
+\qecho Calculate the angular distance between the first document to each other document
+Calculate the angular distance between the first document to each other document
+SELECT docnum,trunc((180.*(ACOS(dmin(1.,(tf_idf%*%testdoc)/(l2norm(tf_idf)*l2norm(testdoc))))/(4.*ATAN(1.))))::numeric,2) angular_distance FROM weights,(SELECT tf_idf testdoc FROM weights WHERE docnum = 1 LIMIT 1) foo ORDER BY 1;
+ docnum | angular_distance 
+--------+------------------
+      1 |             0.00
+      2 |            78.82
+      3 |            90.00
+      4 |            80.02
+(4 rows)
+
+Time: 8.357 ms

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/8b26974c/contrib/gp_sparse_vector/sparse_vector.c
----------------------------------------------------------------------
diff --git a/contrib/gp_sparse_vector/sparse_vector.c b/contrib/gp_sparse_vector/sparse_vector.c
new file mode 100644
index 0000000..88d6047
--- /dev/null
+++ b/contrib/gp_sparse_vector/sparse_vector.c
@@ -0,0 +1,392 @@
+/**
+ * @file
+ * Sparse Vector Datatype
+ *   We would like to store sparse arrays in a terse representation that fits 
+ *   in a small amount of memory.
+ *   We also want to be able to compare the number of instances where the svec 
+ *   of one document intersects another.
+ */
+
+#include <postgres.h>
+
+#include <stdio.h>
+#include <string.h>
+#include <search.h>
+#include <stdlib.h>
+#include <math.h>
+
+#include "utils/array.h"
+#include "utils/builtins.h"
+#include "utils/lsyscache.h"
+#include "catalog/pg_type.h"
+#include "libpq/libpq.h"
+#include "libpq/pqformat.h"
+#include "fmgr.h"
+#include "funcapi.h"
+#include "utils/fmgroids.h"
+#include "lib/stringinfo.h"
+#include "utils/memutils.h"
+#include "sparse_vector.h"
+
+/**
+ * @return An array of float8s obtained by converting a given sparse vector
+ */
+ArrayType *svec_return_array_internal(SvecType *svec)
+{
+	SparseData sdata = sdata_from_svec(svec);
+	double *array = sdata_to_float8arr(sdata);
+
+	ArrayType *pgarray = construct_array((Datum *)array,
+					     sdata->total_value_count,FLOAT8OID,
+					     sizeof(float8),true,'d');
+
+	pfree(array);
+	return(pgarray);
+}
+
+/* 
+ * Must serialize for binary communication with libpq by
+ * creating a StringInfo and sending individual data items like:
+ *   (from backend/libpq/pqformat.c):
+ *      pq_beginmessage - initialize StringInfo buffer
+ *      pq_sendbyte     - append a raw byte to a StringInfo buffer
+ *      pq_sendint      - append a binary integer to a StringInfo buffer
+ *      pq_sendint64    - append a binary 8-byte int to a StringInfo buffer
+ *      pq_sendfloat4   - append a float4 to a StringInfo buffer
+ *      pq_sendfloat8   - append a float8 to a StringInfo buffer
+ *      pq_sendbytes    - append raw data to a StringInfo buffer
+ *      pq_sendcountedtext - append a counted text string (with character set conversion)
+ *      pq_sendtext     - append a text string (with conversion)
+ *      pq_sendstring   - append a null-terminated text string (with conversion)
+ *      pq_send_ascii_string - append a null-terminated text string (without conversion)
+ *      pq_endmessage   - send the completed message to the frontend
+ *
+ */
+
+PG_FUNCTION_INFO_V1(svec_send);
+/**
+ *  svec_send - converts text to binary format
+ */
+Datum svec_send(PG_FUNCTION_ARGS)
+{
+	StringInfoData buf;
+	SvecType *svec = PG_GETARG_SVECTYPE_P(0);
+	SparseData sdata = sdata_from_svec(svec);
+
+	pq_begintypsend(&buf);
+	pq_sendint(&buf,sdata->type_of_data,sizeof(Oid));
+	pq_sendint(&buf,sdata->unique_value_count,sizeof(int));
+	pq_sendint(&buf,sdata->total_value_count,sizeof(int));
+	pq_sendint(&buf,sdata->vals->len,sizeof(int));
+	pq_sendint(&buf,sdata->index->len,sizeof(int));
+	pq_sendbytes(&buf,sdata->vals->data,sdata->vals->len);
+	pq_sendbytes(&buf,sdata->index->data,sdata->index->len);
+
+	PG_RETURN_BYTEA_P(pq_endtypsend(&buf));
+}
+
+PG_FUNCTION_INFO_V1(svec_recv);
+/**
+ *  svec_recv - converts external binary format to text
+ */
+Datum svec_recv(PG_FUNCTION_ARGS)
+{
+	StringInfo buf = (StringInfo) PG_GETARG_POINTER(0);
+	SvecType *svec;
+
+	SparseData sdata=makeEmptySparseData();;
+	sdata->type_of_data       = pq_getmsgint(buf, sizeof(int));
+	sdata->unique_value_count = pq_getmsgint(buf, sizeof(int));
+	sdata->total_value_count  = pq_getmsgint(buf, sizeof(int));
+	sdata->vals->len          = pq_getmsgint(buf, sizeof(int));
+	sdata->index->len         = pq_getmsgint(buf, sizeof(int));
+	sdata->vals->data         = (char *)pq_getmsgbytes(buf,sdata->vals->len);
+	sdata->index->data        = (char *)pq_getmsgbytes(buf,sdata->index->len);
+	svec = svec_from_sparsedata(sdata,true); //Note this copies the data
+//	freeSparseDataAndData(sdata);
+	pfree(sdata);
+
+	PG_RETURN_SVECTYPE_P(svec);
+}
+
+PG_FUNCTION_INFO_V1( svec_return_array );
+/**
+ *  svec_return_array - returns an uncompressed Array
+ */
+Datum svec_return_array(PG_FUNCTION_ARGS)
+{
+	SvecType *svec = PG_GETARG_SVECTYPE_P(0);
+	ArrayType *pgarray = svec_return_array_internal(svec);
+	PG_RETURN_ARRAYTYPE_P(pgarray);
+}
+
+PG_FUNCTION_INFO_V1(svec_out);
+/**
+ *  svec_out - outputs a sparse vector as a C string
+ */
+Datum svec_out(PG_FUNCTION_ARGS)
+{
+	SvecType *svec = PG_GETARG_SVECTYPE_P(0);
+	char *result = svec_out_internal(svec);
+	PG_RETURN_CSTRING(result);
+}
+
+char * svec_out_internal(SvecType *svec)
+{
+	char *ix_string,*vals_string,*result;
+	int ixlen,vslen;
+	SparseData sdata=sdata_from_svec(svec);
+	int64 *array_ix =sdata_index_to_int64arr(sdata);
+	ArrayType *pgarray_ix,*pgarray_vals;
+
+	pgarray_ix = construct_array((Datum *)array_ix,
+				     sdata->unique_value_count,INT8OID,
+				     sizeof(int64),true,'d');
+
+	ix_string = DatumGetPointer(OidFunctionCall1(F_ARRAY_OUT,
+					 PointerGetDatum(pgarray_ix)));
+	ixlen = strlen(ix_string);
+
+	pgarray_vals = construct_array((Datum *)sdata->vals->data,
+				       sdata->unique_value_count,FLOAT8OID,
+				       sizeof(float8),true,'d');
+
+	vals_string = DatumGetPointer(OidFunctionCall1(F_ARRAY_OUT,
+					 PointerGetDatum(pgarray_vals)));
+	vslen = strlen(vals_string);
+
+	result = (char *)palloc(sizeof(char)*(vslen+ixlen+1+1));
+
+	/* NULLs are represented as NaN internally; see svec_in();
+	 * Here we print each NaN as an NVP. */
+	for (int i=0; i!=vslen; i++) 
+		if (vals_string[i] == 'N') 
+		{
+			vals_string[i+1] = 'V';
+			vals_string[i+2] = 'P';
+			i = i+2;
+		}
+
+	sprintf(result,"%s:%s",ix_string,vals_string);
+	pfree(ix_string);
+	pfree(vals_string);
+	pfree(array_ix);
+
+	return(result);
+}
+
+SvecType * svec_in_internal(char * str);
+
+PG_FUNCTION_INFO_V1(svec_in);
+/**
+ *  svec_in - reads in a string and convert that to an svec
+ */
+Datum svec_in(PG_FUNCTION_ARGS)
+{
+	char *str = pstrdup(PG_GETARG_CSTRING(0));
+	SvecType *result = svec_in_internal(str);
+	PG_RETURN_SVECTYPE_P(result);
+}
+
+SvecType * svec_in_internal(char * str)
+{
+	char *values;
+	ArrayType *pgarray_vals,*pgarray_ix;
+	double *vals, *vals_temp;
+	StringInfo index;
+	int64 *u_index;
+	int32_t num_values,total_value_count;
+	SparseData sdata;
+	SvecType *result;
+	bits8 *bitmap;
+	int bitmask;
+	int i,j;
+
+	/* Read in the two arrays defining the Sparse Vector, first is the array
+	 * of run lengths (the count array), the second is an array of the 
+	 * unique values (the data array).
+	 *
+	 * The input format is a pair of standard Postgres arrays separated by 
+	 * a colon, like this:
+	 * 	{1,10,1,5,1}:{4.3,0,0.2,0,7.4}
+	 *
+	 * For now, the data array must only have float8 elements.
+	 */
+	if ((values=strchr(str,':')) == NULL) {
+		ereport(ERROR,
+			(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+			 errmsg("Invalid input string for svec")));
+	} else {
+		*values = '\0';
+		values = values+1;
+	}
+	/* Get the count and data arrays */
+	pgarray_ix = DatumGetArrayTypeP(
+			    OidFunctionCall3(F_ARRAY_IN,CStringGetDatum(str),
+			    ObjectIdGetDatum(INT8OID),Int32GetDatum(-1)));
+
+	pgarray_vals = DatumGetArrayTypeP(
+			    OidFunctionCall3(F_ARRAY_IN,CStringGetDatum(values),
+			    ObjectIdGetDatum(FLOAT8OID),Int32GetDatum(-1)));
+
+	num_values = *(ARR_DIMS(pgarray_ix));
+	u_index = (int64 *)ARR_DATA_PTR(pgarray_ix);
+	vals = (double *)ARR_DATA_PTR(pgarray_vals);
+
+	/* The count and value arrays must be non-empty */
+	int size1 = ARR_NDIM(pgarray_ix);
+	int size2 = ARR_NDIM(pgarray_vals);
+	if (size1 == 0 || size2 == 0)
+		ereport(ERROR,
+			(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+			 errmsg("The count and value arrays must be non-empty")));
+
+	/* The count and value arrays must have the same dimension */
+	if (num_values != *(ARR_DIMS(pgarray_vals)))
+		ereport(ERROR,
+			(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+			 errmsg("Unique value count not equal to run length count %d != %d", num_values, *(ARR_DIMS(pgarray_vals)))));
+
+	/* Count array cannot have NULLs */
+	if (ARR_HASNULL(pgarray_ix))
+		ereport(ERROR,
+			(errcode(ERRCODE_NULL_VALUE_NOT_ALLOWED),
+			 errmsg("NULL value in the count array.")));
+
+	/* If the data array has NULLs, then we need to create an array to
+	 * store the NULL values as NVP values defined in float_specials.h. 
+	 */
+	if (ARR_HASNULL(pgarray_vals)) {
+		vals_temp = vals;
+		vals = (double *)palloc(sizeof(float8) * num_values);
+		bitmap = ARR_NULLBITMAP(pgarray_vals);
+		bitmask = 1;
+		j = 0;
+		for (i=0; i<num_values; i++) {
+			if (bitmap && (*bitmap & bitmask) == 0) // if NULL
+				vals[i] = NVP;
+			else { 
+				vals[i] = vals_temp[j];
+				j++;
+			}
+			if (bitmap) { // advance bitmap pointer
+				bitmask <<= 1;
+				if (bitmask == 0x100) {
+					bitmap++;
+					bitmask = 1;
+				}
+			}
+		}
+	 }
+
+	/* Make an empty StringInfo because we have the data array already */
+	index = makeStringInfo();
+	total_value_count = 0;
+	for (int i=0;i<num_values;i++) {
+		if (u_index[i] <= 0) 
+			ereport(ERROR,
+				(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+				 errmsg("Non-positive run length in input")));
+
+		total_value_count+=u_index[i]; 
+		append_to_rle_index(index,u_index[i]);
+	}
+
+	sdata = makeInplaceSparseData((char *)vals,index->data,
+			num_values*sizeof(double),index->len,FLOAT8OID,
+			num_values,total_value_count);
+	sdata->type_of_data = FLOAT8OID;
+
+	result = svec_from_sparsedata(sdata,true);
+	if (total_value_count == 1) result->dimension = -1; //Scalar
+
+	if (ARR_HASNULL(pgarray_vals)) pfree(vals);
+	pfree(str); /* str is allocated from a strdup */
+	pfree(pgarray_ix);
+	pfree(pgarray_vals);
+
+	return result;
+}
+
+/**
+ * Produces an svec from a SparseData
+ */
+SvecType *svec_from_sparsedata(SparseData sdata, bool trim)
+{
+	int size;
+
+	if (trim)
+	{
+		/* Trim the extra space off of the StringInfo dynamic strings
+		 * before serializing the SparseData
+		 */
+		sdata->vals->maxlen=sdata->vals->len;
+		sdata->index->maxlen=sdata->index->len;
+	}
+
+	size = SVECHDRSIZE + SIZEOF_SPARSEDATASERIAL(sdata);
+
+	SvecType *result = (SvecType *)palloc(size);
+	SET_VARSIZE(result,size);
+	serializeSparseData(SVEC_SDATAPTR(result),sdata);
+	result->dimension = sdata->total_value_count;
+	if (result->dimension == 1) result->dimension=-1; //Scalar
+	return (result);
+}
+
+/**
+ * Produces an svec from an array
+ */
+SvecType *svec_from_float8arr(float8 *array, int dimension)
+{
+	SparseData sdata = float8arr_to_sdata(array,dimension);
+	SvecType *result = svec_from_sparsedata(sdata,true);
+	return result;
+}
+
+/**
+ * Makes an empty svec with sufficient memory allocated for the input number
+ */
+SvecType *makeEmptySvec(int allocation)
+{
+	int val_len = sizeof(float8)*allocation+1;
+	int ind_len = 9*allocation+1;
+	SvecType *svec;
+	SparseData sdata = makeEmptySparseData();
+	sdata->vals->data    = (char *)palloc(val_len);
+	sdata->vals->len = 0;
+	sdata->vals->maxlen  = val_len;
+	sdata->index->data   = (char *)palloc(ind_len);
+	sdata->index->len = 0;
+	sdata->index->maxlen = ind_len;
+	svec = svec_from_sparsedata(sdata,false);
+	freeSparseDataAndData(sdata);
+	return(svec);
+}
+
+/**
+ * Allocates more space for the count and data arrays of an svec
+ */
+SvecType *reallocSvec(SvecType *source)
+{
+	SvecType *svec;
+	SparseData sdata = sdata_from_svec(source);
+	int val_newmaxlen = Max(2*sizeof(float8)+1, 2 * (Size) sdata->vals->maxlen);
+	char *newvals = (char *)palloc(val_newmaxlen);
+	int ind_newmaxlen = Max(2*sizeof(int8)+1, 2 * (Size) sdata->index->maxlen);
+	char *newindex = (char *)palloc(ind_newmaxlen);
+	/*
+	 * This space was never allocated with palloc, so we can't repalloc it!
+	 */
+	memcpy(newvals ,sdata->vals->data ,sdata->vals->len);
+	memcpy(newindex,sdata->index->data,sdata->index->len);
+	sdata->vals->data    = newvals;
+	sdata->vals->maxlen  = val_newmaxlen;
+	sdata->index->data   = newindex;
+	sdata->index->maxlen = ind_newmaxlen;
+	svec = svec_from_sparsedata(sdata,false);
+//	pfree(source);
+	return(svec);
+}
+
+			// 

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/8b26974c/contrib/gp_sparse_vector/sparse_vector.h
----------------------------------------------------------------------
diff --git a/contrib/gp_sparse_vector/sparse_vector.h b/contrib/gp_sparse_vector/sparse_vector.h
new file mode 100644
index 0000000..05a7357
--- /dev/null
+++ b/contrib/gp_sparse_vector/sparse_vector.h
@@ -0,0 +1,156 @@
+/**
+ * @file
+ * \brief Persistent storage for the Sparse Vector Datatype
+ *
+ */
+
+#ifndef SPARSEVECTOR_H
+#define SPARSEVECTOR_H
+
+#include "SparseData.h"
+#include "float_specials.h"
+
+/*!
+ * \internal
+ * Consists of the dimension of the vector (how many elements) and a SparseData
+ * structure that stores the data in a compressed format.
+ * \endinternal
+ */
+typedef struct {
+	int32 vl_len_;   /**< This is unused at the moment */
+	int32 dimension; /**< Number of elements in this vector, special case is -1 indicates a scalar */
+	char data[1];   /**< The serialized SparseData representing the vector here */
+} SvecType;
+
+#define DatumGetSvecTypeP(X)           ((SvecType *) PG_DETOAST_DATUM(X))
+#define DatumGetSvecTypePCopy(X)       ((SvecType *) PG_DETOAST_DATUM_COPY(X))
+#define PG_GETARG_SVECTYPE_P(n)        DatumGetSvecTypeP(PG_GETARG_DATUM(n))
+#define PG_GETARG_SVECTYPE_P_COPY(n)   DatumGetSvecTypePCopy(PG_GETARG_DATUM(n))
+#define PG_RETURN_SVECTYPE_P(x)        PG_RETURN_POINTER(x)
+
+/* Below are the locations of the SparseData values within the serialized
+ * inline SparseData below the Svec header
+ *
+ * All macros take an (SvecType *) as argument
+ */
+#define SVECHDRSIZE	(VARHDRSZ + sizeof(int32))
+/* Beginning of the serialized SparseData */
+#define SVEC_SDATAPTR(x)	((char *)(x)+SVECHDRSIZE)
+#define SVEC_SIZEOFSERIAL(x)	(SVECHDRSIZE+SIZEOF_SPARSEDATASERIAL((SparseData)SVEC_SDATAPTR(x)))
+#define SVEC_UNIQUE_VALCNT(x)	(SDATA_UNIQUE_VALCNT(SVEC_SDATAPTR(x)))
+#define SVEC_TOTAL_VALCNT(x)	(SDATA_TOTAL_VALCNT(SVEC_SDATAPTR(x)))
+#define SVEC_DATA_SIZE(x) 	(SDATA_DATA_SIZE(SVEC_SDATAPTR(x)))
+#define SVEC_VALS_PTR(x)	(SDATA_VALS_PTR(SVEC_SDATAPTR(x)))
+/* The size of the index is variable unlike the values, so in the serialized
+ * SparseData, we include an int32 that indicates the size of the index.
+ */
+#define SVEC_INDEX_SIZE(x) 	(SDATA_INDEX_SIZE(SVEC_SDATAPTR(x)))
+#define SVEC_INDEX_PTR(x) 	(SDATA_INDEX_PTR(SVEC_SDATAPTR(x)))
+
+/** @return True if input is a scalar */
+#define IS_SCALAR(x)	(((x)->dimension) < 0 ? 1 : 0 )
+
+/** @return True if input is a NULL, represented internally as a NVP */
+#define IS_NVP(x)  (memcmp(&(x),&(NVP),sizeof(double)) == 0)
+
+static inline int check_scalar(int i1, int i2)
+{
+	if ((!i1) && (!i2)) return(0);
+	else if (i1 && i2) return(3);
+	else if (i1)  return(1);
+	else if (i2) return(2);
+	return(0);
+}
+
+/*
+ * This routine supplies a pointer to a SparseData derived from an SvecType.
+ * The SvecType is a serialized structure with fixed memory allocations, so
+ * care must be taken not to append to the embedded StringInfo structs
+ * without re-serializing the SparseData into the SvecType.
+ */
+static inline SparseData sdata_from_svec(SvecType *svec)
+{
+	char *sdataptr   = SVEC_SDATAPTR(svec);
+	SparseData sdata = (SparseData)sdataptr;
+	sdata->vals  = (StringInfo)SDATA_DATA_SINFO(sdataptr);
+	sdata->index = (StringInfo)SDATA_INDEX_SINFO(sdataptr);
+	sdata->vals->data   = SVEC_VALS_PTR(svec);
+	if (sdata->index->maxlen == 0)
+	{
+		sdata->index->data = NULL;
+	} else
+	{
+		sdata->index->data  = SVEC_INDEX_PTR(svec);
+	}
+	return(sdata);
+}
+
+static inline void printout_svec(SvecType *svec, char *msg, int stop);
+static inline void printout_svec(SvecType *svec, char *msg, int stop)
+{
+	printout_sdata((SparseData)SVEC_SDATAPTR(svec), msg, stop);
+	elog(NOTICE,"len,dimension=%d,%d",VARSIZE(svec),svec->dimension);
+}
+
+char *svec_out_internal(SvecType *svec);
+SvecType *svec_from_sparsedata(SparseData sdata,bool trim);
+ArrayType *svec_return_array_internal(SvecType *svec);
+char *svec_out_internal(SvecType *svec);
+SvecType *svec_make_scalar(float8 value);
+SvecType *svec_from_float8arr(float8 *array, int dimension);
+SvecType *op_svec_by_svec_internal(enum operation_t operation, SvecType *svec1, SvecType *svec2);
+SvecType *svec_operate_on_sdata_pair(int scalar_args,enum operation_t operation,SparseData left,SparseData right);
+SvecType *makeEmptySvec(int allocation);
+SvecType *reallocSvec(SvecType *source);
+
+Datum svec_in(PG_FUNCTION_ARGS);
+Datum svec_out(PG_FUNCTION_ARGS);
+Datum svec_return_vector(PG_FUNCTION_ARGS);
+Datum svec_return_array(PG_FUNCTION_ARGS);
+Datum svec_send(PG_FUNCTION_ARGS);
+Datum svec_recv(PG_FUNCTION_ARGS);
+
+// Operators
+Datum svec_pow(PG_FUNCTION_ARGS);
+Datum svec_equals(PG_FUNCTION_ARGS);
+Datum svec_minus(PG_FUNCTION_ARGS);
+Datum svec_plus(PG_FUNCTION_ARGS);
+Datum svec_div(PG_FUNCTION_ARGS);
+Datum svec_dot(PG_FUNCTION_ARGS);
+Datum svec_mult(PG_FUNCTION_ARGS);
+Datum svec_summate(PG_FUNCTION_ARGS);
+
+Datum float8arr_minus_float8arr(PG_FUNCTION_ARGS);
+Datum svec_minus_float8arr(PG_FUNCTION_ARGS);
+Datum float8arr_minus_svec(PG_FUNCTION_ARGS);
+Datum float8arr_plus_float8arr(PG_FUNCTION_ARGS);
+Datum svec_plus_float8arr(PG_FUNCTION_ARGS);
+Datum float8arr_plus_svec(PG_FUNCTION_ARGS);
+Datum float8arr_mult_float8arr(PG_FUNCTION_ARGS);
+Datum svec_mult_float8arr(PG_FUNCTION_ARGS);
+Datum float8arr_mult_svec(PG_FUNCTION_ARGS);
+Datum float8arr_div_float8arr(PG_FUNCTION_ARGS);
+Datum svec_div_float8arr(PG_FUNCTION_ARGS);
+Datum float8arr_div_svec(PG_FUNCTION_ARGS);
+Datum svec_dot_float8arr(PG_FUNCTION_ARGS);
+Datum float8arr_dot_svec(PG_FUNCTION_ARGS);
+
+// Casts
+Datum svec_cast_int2(PG_FUNCTION_ARGS);
+Datum svec_cast_int4(PG_FUNCTION_ARGS);
+Datum svec_cast_int8(PG_FUNCTION_ARGS);
+Datum svec_cast_float4(PG_FUNCTION_ARGS);
+Datum svec_cast_float8(PG_FUNCTION_ARGS);
+Datum svec_cast_numeric(PG_FUNCTION_ARGS);
+
+Datum float8arr_cast_int2(PG_FUNCTION_ARGS);
+Datum float8arr_cast_int4(PG_FUNCTION_ARGS);
+Datum float8arr_cast_int8(PG_FUNCTION_ARGS);
+Datum float8arr_cast_float4(PG_FUNCTION_ARGS);
+Datum float8arr_cast_float8(PG_FUNCTION_ARGS);
+Datum float8arr_cast_numeric(PG_FUNCTION_ARGS);
+
+Datum svec_cast_float8arr(PG_FUNCTION_ARGS);
+Datum svec_cast_positions_float8arr(PG_FUNCTION_ARGS);
+
+#endif  /* SPARSEVECTOR_H */

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/8b26974c/contrib/gp_sparse_vector/test_output
----------------------------------------------------------------------
diff --git a/contrib/gp_sparse_vector/test_output b/contrib/gp_sparse_vector/test_output
new file mode 100644
index 0000000..e067d70
--- /dev/null
+++ b/contrib/gp_sparse_vector/test_output
@@ -0,0 +1,296 @@
+-- \set FETCH_COUNT 100
+\timing
+Timing is on.
+drop table if exists test;
+DROP TABLE
+Time: 1.163 ms
+create table test (a int, b svec) DISTRIBUTED BY (a);
+CREATE TABLE
+Time: 67.692 ms
+insert into test (select 1,gp_extract_feature_histogram('{"one","two","three","four","five","six"}','{"twe","four","five","six","one","three","two","one"}'));
+INSERT 0 1
+Time: 72.360 ms
+insert into test (select 2,gp_extract_feature_histogram('{"one","two","three","four","five","six"}','{"the","brown","cat","ran","across","three","dogs"}'));
+INSERT 0 1
+Time: 4.543 ms
+insert into test (select 3,gp_extract_feature_histogram('{"one","two","three","four","five","six"}','{"two","four","five","six","one","three","two","one"}'));
+INSERT 0 1
+Time: 3.701 ms
+-- Test the equals operator (should be only 3 rows)
+select a,b::float8[] cross_product_equals from (select a,b from test) foo where b = foo.b order by a;
+ a | cross_product_equals 
+---+----------------------
+ 1 | {2,1,1,1,1,1}
+ 2 | {0,0,1,0,0,0}
+ 3 | {2,2,1,1,1,1}
+(3 rows)
+
+Time: 4.044 ms
+drop table if exists test2;
+DROP TABLE
+Time: 0.217 ms
+create table test2 as select * from test DISTRIBUTED BY (a);
+SELECT 3
+Time: 64.780 ms
+-- Test the plus operator (should be 9 rows)
+select (t1.b+t2.b)::float8[] cross_product_sum from test t1, test2 t2 order by t1.a;
+ cross_product_sum 
+-------------------
+ {4,3,2,2,2,2}
+ {4,2,2,2,2,2}
+ {2,1,2,1,1,1}
+ {0,0,2,0,0,0}
+ {2,1,2,1,1,1}
+ {2,2,2,1,1,1}
+ {4,4,2,2,2,2}
+ {4,3,2,2,2,2}
+ {2,2,2,1,1,1}
+(9 rows)
+
+Time: 7.121 ms
+-- Test ORDER BY
+select (t1.b+t2.b)::float8[] cross_product_sum, l2norm(t1.b+t2.b) l2norm, (t1.b+t2.b) sparse_vector from test t1, test2 t2 order by 3;
+ cross_product_sum |      l2norm      |    sparse_vector    
+-------------------+------------------+---------------------
+ {0,0,2,0,0,0}     |                2 | {2,1,3}:{0,2,0}
+ {2,1,2,1,1,1}     | 3.46410161513775 | {1,1,1,3}:{2,1,2,1}
+ {2,1,2,1,1,1}     | 3.46410161513775 | {1,1,1,3}:{2,1,2,1}
+ {2,2,2,1,1,1}     | 3.87298334620742 | {3,3}:{2,1}
+ {2,2,2,1,1,1}     | 3.87298334620742 | {3,3}:{2,1}
+ {4,2,2,2,2,2}     |                6 | {1,5}:{4,2}
+ {4,3,2,2,2,2}     | 6.40312423743285 | {1,1,4}:{4,3,2}
+ {4,3,2,2,2,2}     | 6.40312423743285 | {1,1,4}:{4,3,2}
+ {4,4,2,2,2,2}     | 6.92820323027551 | {2,4}:{4,2}
+(9 rows)
+
+Time: 5.359 ms
+ select (sum(t1.b))::float8[] as features_sum from test t1;
+ features_sum  
+---------------
+ {4,3,3,2,2,2}
+(1 row)
+
+Time: 2.844 ms
+-- Test the div operator
+ select (t1.b/(select sum(b) from test))::float8[] as weights from test t1 order by a;
+                        weights                        
+-------------------------------------------------------
+ {0.5,0.333333333333333,0.333333333333333,0.5,0.5,0.5}
+ {0,0,0.333333333333333,0,0,0}
+ {0.5,0.666666666666667,0.333333333333333,0.5,0.5,0.5}
+(3 rows)
+
+Time: 4.229 ms
+-- Test the * operator
+ select t1.b %*% (t1.b/(select sum(b) from test)) as raw_score from test t1 order by a;
+     raw_score     
+-------------------
+  3.16666666666667
+ 0.333333333333333
+  4.16666666666667
+(3 rows)
+
+Time: 4.098 ms
+-- Test the * and l2norm operators
+ select (t1.b %*% (t1.b/(select sum(b) from test))) / (l2norm(t1.b) * l2norm((select sum(b) from test))) as norm_score from test t1 order by a;
+    norm_score     
+-------------------
+  0.15563317594128
+ 0.049147318718299
+ 0.177345110574739
+(3 rows)
+
+Time: 6.295 ms
+-- Test the ^ and l1norm operators
+select ('{1,2}:{20.,10.}'::svec)^('{1}:{3.}'::svec);
+     ?column?      
+-------------------
+ {1,2}:{8000,1000}
+(1 row)
+
+Time: 1.060 ms
+ select (t1.b %*% (t1.b/(select sum(b) from test))) / (l1norm(t1.b) * l1norm((select sum(b) from test))) as norm_score from test t1 order by a;
+     norm_score     
+--------------------
+ 0.0282738095238095
+ 0.0208333333333333
+ 0.0325520833333333
+(3 rows)
+
+Time: 6.715 ms
+-- Test the multi-concatenation and show sizes compared with a normal array
+drop table if exists corpus_proj;
+DROP TABLE
+Time: 0.258 ms
+drop table if exists corpus_proj_array;
+DROP TABLE
+Time: 0.225 ms
+create table corpus_proj as (select 10000 *|| ('{45,2,35,4,15,1}:{0,1,0,1,0,2}'::svec) result ) distributed randomly;
+SELECT 1
+Time: 91.366 ms
+create table corpus_proj_array as (select result::float8[] from corpus_proj) distributed randomly;
+SELECT 1
+Time: 318.754 ms
+-- Calculate on-disk size of sparse vector
+select pg_size_pretty(pg_total_relation_size('corpus_proj'));
+ pg_size_pretty 
+----------------
+ 192 kB
+(1 row)
+
+Time: 5.393 ms
+-- Calculate on-disk size of normal array
+select pg_size_pretty(pg_total_relation_size('corpus_proj_array'));
+ pg_size_pretty 
+----------------
+ 288 kB
+(1 row)
+
+Time: 3.488 ms
+\timing
+Timing is off.
+-- Calculate L1 norm from sparse vector
+select l1norm(result) from corpus_proj;
+ l1norm 
+--------
+  80000
+(1 row)
+
+-- Calculate L1 norm from float8[]
+select l1norm(result) from corpus_proj_array;
+ l1norm 
+--------
+  80000
+(1 row)
+
+-- Calculate L2 norm from sparse vector
+select l2norm(result) from corpus_proj;
+      l2norm      
+------------------
+ 316.227766016838
+(1 row)
+
+-- Calculate L2 norm from float8[]
+select l2norm(result) from corpus_proj_array;
+      l2norm      
+------------------
+ 316.227766016838
+(1 row)
+
+drop table corpus_proj;
+DROP TABLE
+drop table corpus_proj_array;
+DROP TABLE
+drop table test;
+DROP TABLE
+drop table test2;
+DROP TABLE
+-- Test operators between svec and float8[]
+select ('{1,2,3,4}:{3,4,5,6}'::svec)           %*% ('{1,2,3,4}:{3,4,5,6}'::svec)::float8[];
+ ?column? 
+----------
+      260
+(1 row)
+
+select ('{1,2,3,4}:{3,4,5,6}'::svec)::float8[] %*% ('{1,2,3,4}:{3,4,5,6}'::svec);
+ ?column? 
+----------
+      260
+(1 row)
+
+select ('{1,2,3,4}:{3,4,5,6}'::svec)            /  ('{1,2,3,4}:{3,4,5,6}'::svec)::float8[];
+ ?column? 
+----------
+ {10}:{1}
+(1 row)
+
+select ('{1,2,3,4}:{3,4,5,6}'::svec)::float8[]  /  ('{1,2,3,4}:{3,4,5,6}'::svec);
+ ?column? 
+----------
+ {10}:{1}
+(1 row)
+
+select ('{1,2,3,4}:{3,4,5,6}'::svec)            *  ('{1,2,3,4}:{3,4,5,6}'::svec)::float8[];
+        ?column?        
+------------------------
+ {1,2,3,4}:{9,16,25,36}
+(1 row)
+
+select ('{1,2,3,4}:{3,4,5,6}'::svec)::float8[]  *  ('{1,2,3,4}:{3,4,5,6}'::svec);
+        ?column?        
+------------------------
+ {1,2,3,4}:{9,16,25,36}
+(1 row)
+
+select ('{1,2,3,4}:{3,4,5,6}'::svec)            +  ('{1,2,3,4}:{3,4,5,6}'::svec)::float8[];
+       ?column?        
+-----------------------
+ {1,2,3,4}:{6,8,10,12}
+(1 row)
+
+select ('{1,2,3,4}:{3,4,5,6}'::svec)::float8[]  +  ('{1,2,3,4}:{3,4,5,6}'::svec);
+       ?column?        
+-----------------------
+ {1,2,3,4}:{6,8,10,12}
+(1 row)
+
+select ('{1,2,3,4}:{3,4,5,6}'::svec)            -  ('{1,2,3,4}:{3,4,5,6}'::svec)::float8[];
+ ?column? 
+----------
+ {10}:{0}
+(1 row)
+
+select ('{1,2,3,4}:{3,4,5,6}'::svec)::float8[]  -  ('{1,2,3,4}:{3,4,5,6}'::svec);
+ ?column? 
+----------
+ {10}:{0}
+(1 row)
+
+-- Test the pivot operator in the presence of NULL values
+drop table if exists pivot_test;
+DROP TABLE
+create table pivot_test(a float8) distributed randomly;
+CREATE TABLE
+insert into pivot_test values (0),(1),(NULL),(2),(3);
+INSERT 0 5
+select array_agg(a) from pivot_test;
+      array_agg      
+---------------------
+ {1,1,2,1}:{1,2,0,3}
+(1 row)
+
+select l1norm(array_agg(a)) from pivot_test;
+ l1norm 
+--------
+      6
+(1 row)
+
+drop table if exists pivot_test;
+DROP TABLE
+-- Answer should be 5
+select vec_median(array_agg(a)) from (select generate_series(1,9) a) foo;
+ vec_median 
+------------
+          5
+(1 row)
+
+-- Answer should be a 10-wide vector
+select array_agg(a) from (select trunc(random()*10) a,generate_series(1,100000) order by a) foo;
+                                   array_agg                                   
+-------------------------------------------------------------------------------
+ {9946,10172,10063,9850,9874,10139,9964,9923,10131,9938}:{0,1,2,3,4,5,6,7,8,9}
+(1 row)
+
+-- Average is 4.50034, median is 5
+select vec_median('{9960,9926,10053,9993,10080,10050,9938,9941,10030,10029}:{1,9,8,7,6,5,4,3,2,0}'::svec);
+ vec_median 
+------------
+          5
+(1 row)
+
+select vec_median('{9960,9926,10053,9993,10080,10050,9938,9941,10030,10029}:{1,9,8,7,6,5,4,3,2,0}'::svec::float8[]);
+ vec_median 
+------------
+          5
+(1 row)
+

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/8b26974c/contrib/hawq-hadoop/.gitignore
----------------------------------------------------------------------
diff --git a/contrib/hawq-hadoop/.gitignore b/contrib/hawq-hadoop/.gitignore
new file mode 100644
index 0000000..2f7896d
--- /dev/null
+++ b/contrib/hawq-hadoop/.gitignore
@@ -0,0 +1 @@
+target/

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/8b26974c/contrib/hawq-hadoop/Makefile
----------------------------------------------------------------------
diff --git a/contrib/hawq-hadoop/Makefile b/contrib/hawq-hadoop/Makefile
new file mode 100644
index 0000000..781ba99
--- /dev/null
+++ b/contrib/hawq-hadoop/Makefile
@@ -0,0 +1,70 @@
+# HAWQ-HADOOP Makefile
+
+ifdef USE_PGXS
+PGXS := $(shell pg_config --pgxs)
+include $(PGXS)
+else
+subdir = contrib/hawq-hadoop
+top_builddir = ../..
+include $(top_builddir)/src/Makefile.global
+include $(top_srcdir)/contrib/contrib-global.mk
+endif
+
+# Maven subprojects
+SUBPROJECTS = hawq-mapreduce-ao hawq-mapreduce-parquet hawq-mapreduce-common hawq-mapreduce-tool
+DEPENDENCIES = postgresql-9.2-1003-jdbc4.jar snakeyaml-1.12.jar parquet-column-1.1.0.jar parquet-common-1.1.0.jar parquet-encoding-1.1.0.jar parquet-format-1.0.0.jar parquet-hadoop-1.1.0.jar snappy-java-1.1.0.jar
+VERSION = 1.1.0
+DEFAULTTARGET = hawq-mapreduce-tool/target/hawq-mapreduce-tool-$(VERSION).jar
+JARDIR = hawq-mr-io
+
+# Now the makefile target that do the work:
+# The default target:
+all: $(DEFAULTTARGET)
+
+$(DEFAULTTARGET):
+ifdef MAVEN
+	$(MAVEN) package -DskipTests
+else
+	@$(missing) mvn $< $@
+endif
+
+# Make clean distclean
+clean distclean:
+ifdef MAVEN
+	$(MAVEN) clean
+else
+	@$(missing) mvn $< $@
+endif
+
+# Make check
+unittest-check check installcheck:
+ifdef MAVEN
+	$(MAVEN) test
+else
+	@$(missing) mvn $< $@
+endif
+
+# Make install
+install: $(DEFAULTTARGET)
+	@for subpro in $(SUBPROJECTS); do \
+	echo "$(INSTALL_SHLIB) $$subpro/target/$$subpro-$(VERSION).jar '$(DESTDIR)$(pkglibdir)/$(JARDIR)/$$subpro.jar'"; \
+	$(INSTALL_SHLIB) $$subpro/target/$$subpro-$(VERSION).jar $(DESTDIR)$(pkglibdir)/$(JARDIR)/$$subpro.jar; \
+	$(INSTALL_SHLIB) $$subpro/target/$$subpro-$(VERSION)-tests.jar $(DESTDIR)$(pkglibdir)/$(JARDIR)/$$subpro-tests.jar; \
+	done
+	echo "$(INSTALL_SHLIB) target/hawq-hadoop-$(VERSION)-javadoc.jar '$(DESTDIR)$(pkglibdir)/$(JARDIR)/hawq-hadoop-javadoc.jar'"
+	$(INSTALL_SHLIB) target/hawq-hadoop-$(VERSION)-javadoc.jar $(DESTDIR)$(pkglibdir)/$(JARDIR)/hawq-hadoop-javadoc.jar
+	@for dep in $(DEPENDENCIES); do \
+	echo "$(INSTALL_SHLIB) hawq-mapreduce-tool/lib/$$dep '$(DESTDIR)$(pkglibdir)/$(JARDIR)/lib/$$dep'"; \
+	$(INSTALL_SHLIB) hawq-mapreduce-tool/lib/$$dep $(DESTDIR)$(pkglibdir)/$(JARDIR)/lib/$$dep; \
+	done
+
+# Make uninstall
+uninstall:
+	@for subpro in $(SUBPROJECTS); do \
+	rm -rf $(DESTDIR)$(pkglibdir)/$(JARDIR)/$$subpro.jar; \
+	rm -rf $(DESTDIR)$(pkglibdir)/$(JARDIR)/$$subpro-tests.jar; \
+	done
+	@rm -rf $(DESTDIR)$(pkglibdir)/$(JARDIR)/hawq-hadoop-javadoc.jar
+	@for dep in $(DEPENDENCIES); do \
+	rm -rf $(DESTDIR)$(pkglibdir)/$(JARDIR)/lib/$$dep; \
+	done

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/8b26974c/contrib/hawq-hadoop/README
----------------------------------------------------------------------
diff --git a/contrib/hawq-hadoop/README b/contrib/hawq-hadoop/README
new file mode 100644
index 0000000..add87a0
--- /dev/null
+++ b/contrib/hawq-hadoop/README
@@ -0,0 +1,21 @@
+Run Unit Test
+-------------
+All unit tests can be run without starting HAWQ and HDFS, simply run
+$ mvn clean test
+
+
+Run Feature Test
+----------------
+1. Make sure HAWQ started, and have a database "gptest". We assume
+   master's host and port to be localhost:5432, but you can change it
+   using the following environment variables
+   - PG_BASE_ADDRESS
+   - PG_BASE_PORT
+
+2. Make sure HDFS started.
+   - HADOOP_HOME    defaults to "/usr/local/hadoop"
+
+
+Code Coverage
+-------------
+$ mvn clean clover2:setup test clover2:aggregate clover2:clover

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/8b26974c/contrib/hawq-hadoop/hawq-mapreduce-ao/.gitignore
----------------------------------------------------------------------
diff --git a/contrib/hawq-hadoop/hawq-mapreduce-ao/.gitignore b/contrib/hawq-hadoop/hawq-mapreduce-ao/.gitignore
new file mode 100644
index 0000000..2f7896d
--- /dev/null
+++ b/contrib/hawq-hadoop/hawq-mapreduce-ao/.gitignore
@@ -0,0 +1 @@
+target/

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/8b26974c/contrib/hawq-hadoop/hawq-mapreduce-ao/pom.xml
----------------------------------------------------------------------
diff --git a/contrib/hawq-hadoop/hawq-mapreduce-ao/pom.xml b/contrib/hawq-hadoop/hawq-mapreduce-ao/pom.xml
new file mode 100644
index 0000000..a4e8082
--- /dev/null
+++ b/contrib/hawq-hadoop/hawq-mapreduce-ao/pom.xml
@@ -0,0 +1,39 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <parent>
+        <artifactId>hawq-hadoop</artifactId>
+        <groupId>com.pivotal.hawq</groupId>
+        <version>1.1.0</version>
+        <relativePath>../pom.xml</relativePath>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+
+    <artifactId>hawq-mapreduce-ao</artifactId>
+    <packaging>jar</packaging>
+
+    <dependencies>
+        <dependency>
+            <groupId>${project.groupId}</groupId>
+            <artifactId>hawq-mapreduce-common</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+    </dependencies>
+
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>com.atlassian.maven.plugins</groupId>
+                <artifactId>maven-clover2-plugin</artifactId>
+                <configuration>
+                    <licenseLocation>../lib/clover.license</licenseLocation>
+                    <excludes>
+                        <exclude>**/Compress*.java</exclude>
+                    </excludes>
+                </configuration>
+            </plugin>
+        </plugins>
+    </build>
+
+</project>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/8b26974c/contrib/hawq-hadoop/hawq-mapreduce-ao/src/main/java/com/pivotal/hawq/mapreduce/ao/HAWQAOInputFormat.java
----------------------------------------------------------------------
diff --git a/contrib/hawq-hadoop/hawq-mapreduce-ao/src/main/java/com/pivotal/hawq/mapreduce/ao/HAWQAOInputFormat.java b/contrib/hawq-hadoop/hawq-mapreduce-ao/src/main/java/com/pivotal/hawq/mapreduce/ao/HAWQAOInputFormat.java
new file mode 100644
index 0000000..36d5d73
--- /dev/null
+++ b/contrib/hawq-hadoop/hawq-mapreduce-ao/src/main/java/com/pivotal/hawq/mapreduce/ao/HAWQAOInputFormat.java
@@ -0,0 +1,126 @@
+package com.pivotal.hawq.mapreduce.ao;
+
+import com.pivotal.hawq.mapreduce.HAWQRecord;
+import com.pivotal.hawq.mapreduce.file.HAWQAOFileStatus;
+import com.pivotal.hawq.mapreduce.ao.file.HAWQAOSplit;
+import com.pivotal.hawq.mapreduce.conf.HAWQConfiguration;
+
+import com.pivotal.hawq.mapreduce.metadata.HAWQAOTableMetadata;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.BlockLocation;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * An InputFormat that reads input data from HAWQ append only table.
+ *<p>
+ * In most cases, you should consider use HAWQInputFormat, which delegates to
+ * this class for AO table, but can handle other non-AO table as well.
+ */
+public final class HAWQAOInputFormat extends FileInputFormat<Void, HAWQRecord>
+{
+	private static final Log LOG = LogFactory.getLog(HAWQAOInputFormat.class);
+
+	private static HAWQAOFileStatus[] fileStatuses = null;
+
+	/**
+	 * Initializes the map-part of the job with the appropriate input settings
+	 * through connecting to Database.
+	 * 
+	 * @param conf
+	 *            The map-reduce job configuration
+	 * @param metadata
+	 *            The metadata of this table get from database or metadataFile
+	 */
+	public static void setInput(Configuration conf, HAWQAOTableMetadata metadata)
+	{
+		HAWQConfiguration.setInputTableEncoding(conf, metadata.getDatabaseEncoding());
+		HAWQConfiguration.setInputTableSchema(conf, metadata.getSchema());
+		/*
+		 * GPSQL-1047
+		 * 
+		 * Set version into configuration to get working environment of database
+		 */
+		HAWQConfiguration.setDatabaseVersion(conf, metadata.getDatabaseVersion());
+		fileStatuses = metadata.getFileStatuses();
+	}
+
+	/**
+	 * Create a record reader for a given split. The framework will call
+	 * {@link RecordReader#initialize(InputSplit, TaskAttemptContext)} before
+	 * the split is used.
+	 * 
+	 * @param split
+	 *            the split to be read
+	 * @param context
+	 *            the information about the task
+	 * @return a new record reader
+	 * @throws IOException
+	 * @throws InterruptedException
+	 */
+	@Override
+	public RecordReader<Void, HAWQRecord> createRecordReader(InputSplit split,
+			TaskAttemptContext context) throws IOException,
+			InterruptedException
+	{
+		// For AO table, we return HAWQAORecordReader
+		RecordReader<Void, HAWQRecord> recordReader = new HAWQAORecordReader();
+		return recordReader;
+	}
+
+	/**
+	 * Generate the list of files and make them into FileSplits.
+	 * 
+	 * @param job
+	 *            the job context
+	 * @throws IOException
+	 */
+	@Override
+	public List<InputSplit> getSplits(JobContext job) throws IOException
+	{
+		List<InputSplit> splits = new ArrayList<InputSplit>();
+		for (int i = 0; i < fileStatuses.length; ++i)
+		{
+			HAWQAOFileStatus aofilestatus = fileStatuses[i];
+			String pathStr = aofilestatus.getFilePath();
+			long fileLength = aofilestatus.getFileLength();
+			if (fileLength == 0)
+				continue;
+
+			boolean checksum = aofilestatus.getChecksum();
+			String compressType = aofilestatus.getCompressType();
+			int blocksize = aofilestatus.getBlockSize();
+			Path path = new Path(pathStr);
+			if (fileLength != 0)
+			{
+				FileSystem fs = path.getFileSystem(job.getConfiguration());
+				BlockLocation[] blkLocations = fs.getFileBlockLocations(
+						fs.getFileStatus(path), 0, fileLength);
+				// not splitable
+				splits.add(new HAWQAOSplit(path, 0, fileLength, blkLocations[0]
+						.getHosts(), checksum, compressType, blocksize));
+			}
+			else
+			{
+				// Create empty hosts array for zero length files
+				splits.add(new HAWQAOSplit(path, 0, fileLength, new String[0],
+						checksum, compressType, blocksize));
+			}
+		}
+		job.getConfiguration().setLong(NUM_INPUT_FILES, splits.size());
+		LOG.debug("Total # of splits: " + splits.size());
+		return splits;
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/8b26974c/contrib/hawq-hadoop/hawq-mapreduce-ao/src/main/java/com/pivotal/hawq/mapreduce/ao/HAWQAORecordReader.java
----------------------------------------------------------------------
diff --git a/contrib/hawq-hadoop/hawq-mapreduce-ao/src/main/java/com/pivotal/hawq/mapreduce/ao/HAWQAORecordReader.java b/contrib/hawq-hadoop/hawq-mapreduce-ao/src/main/java/com/pivotal/hawq/mapreduce/ao/HAWQAORecordReader.java
new file mode 100644
index 0000000..d07352c
--- /dev/null
+++ b/contrib/hawq-hadoop/hawq-mapreduce-ao/src/main/java/com/pivotal/hawq/mapreduce/ao/HAWQAORecordReader.java
@@ -0,0 +1,142 @@
+package com.pivotal.hawq.mapreduce.ao;
+
+import com.pivotal.hawq.mapreduce.conf.HAWQConfiguration;
+import com.pivotal.hawq.mapreduce.HAWQException;
+import com.pivotal.hawq.mapreduce.HAWQRecord;
+import com.pivotal.hawq.mapreduce.ao.io.HAWQAOFileReader;
+import com.pivotal.hawq.mapreduce.ao.io.HAWQAORecord;
+
+import com.pivotal.hawq.mapreduce.schema.HAWQSchema;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+import java.io.IOException;
+
+/**
+ * The record reader breaks the data into key/value pairs for input to mapper.
+ */
+public class HAWQAORecordReader extends RecordReader<Void, HAWQRecord>
+{
+
+	private HAWQRecord value = null;
+	private HAWQAOFileReader filereader = null;
+	private boolean more = true;
+
+	/**
+	 * Close the record reader.
+	 */
+	@Override
+	public void close() throws IOException
+	{
+		filereader.close();
+	}
+
+	/**
+	 * Get the current key
+	 * 
+	 * @return the current key or null if there is no current key
+	 * @throws IOException
+	 * @throws InterruptedException
+	 */
+	@Override
+	public Void getCurrentKey() throws IOException, InterruptedException
+	{
+		// Always null
+		return null;
+	}
+
+	/**
+	 * Get the current value.
+	 * 
+	 * @return the object that was read
+	 * @throws IOException
+	 * @throws InterruptedException
+	 */
+	@Override
+	public HAWQRecord getCurrentValue() throws IOException,
+			InterruptedException
+	{
+		return value;
+	}
+
+	/**
+	 * The current progress of the record reader through its data.
+	 * 
+	 * @return a number between 0.0 and 1.0 that is the fraction of the data
+	 *         read
+	 * @throws IOException
+	 * @throws InterruptedException
+	 */
+	@Override
+	public float getProgress() throws IOException, InterruptedException
+	{
+		return more ? 0f : 100f;
+	}
+
+	/**
+	 * Called once at initialization.
+	 * 
+	 * @param split
+	 *            the split that defines the range of records to read
+	 * @param context
+	 *            the information about the task
+	 * @throws IOException
+	 * @throws InterruptedException
+	 */
+	@Override
+	public void initialize(InputSplit split, TaskAttemptContext context)
+			throws IOException, InterruptedException
+	{
+
+		// initialize the value
+		Configuration conf = context.getConfiguration();
+
+		// Extract the parameters needed by HAWQAOFileReader and HAWQAORecord
+		String encoding = HAWQConfiguration.getInputTableEncoding(conf);
+		HAWQSchema schema = HAWQConfiguration.getInputTableSchema(conf);
+		/*
+		 * GPSQL-1047
+		 * 
+		 * Get version from configuration and init HAWQAORecord with it
+		 */
+		String version = HAWQConfiguration.getDatabaseVersion(conf);
+
+		filereader = new HAWQAOFileReader(conf, split);
+
+		try
+		{
+			value = new HAWQAORecord(schema, encoding, version);
+		}
+		catch (HAWQException hawqE)
+		{
+			throw new IOException(hawqE.getMessage());
+		}
+	}
+
+	/**
+	 * Read the next key, value pair.
+	 * 
+	 * @return true if a key/value pair was read
+	 * @throws IOException
+	 * @throws InterruptedException
+	 */
+	@Override
+	public boolean nextKeyValue() throws IOException, InterruptedException
+	{
+		try
+		{
+			if (filereader.readRecord((HAWQAORecord) value))
+			{
+				return true;
+			}
+		}
+		catch (HAWQException hawqE)
+		{
+			throw new IOException(hawqE.getMessage());
+		}
+		more = false;
+		return false;
+	}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/8b26974c/contrib/hawq-hadoop/hawq-mapreduce-ao/src/main/java/com/pivotal/hawq/mapreduce/ao/file/HAWQAOSplit.java
----------------------------------------------------------------------
diff --git a/contrib/hawq-hadoop/hawq-mapreduce-ao/src/main/java/com/pivotal/hawq/mapreduce/ao/file/HAWQAOSplit.java b/contrib/hawq-hadoop/hawq-mapreduce-ao/src/main/java/com/pivotal/hawq/mapreduce/ao/file/HAWQAOSplit.java
new file mode 100644
index 0000000..23e65c3
--- /dev/null
+++ b/contrib/hawq-hadoop/hawq-mapreduce-ao/src/main/java/com/pivotal/hawq/mapreduce/ao/file/HAWQAOSplit.java
@@ -0,0 +1,87 @@
+package com.pivotal.hawq.mapreduce.ao.file;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+
+/**
+ * A section of an input file. Returned by
+ * HAWQAOInputFormat.getSplits(JobContext) and passed to
+ * HAWQAOInputFormat.createRecordReader(InputSplit,TaskAttemptContext).
+ */
+public class HAWQAOSplit extends FileSplit
+{
+	private boolean checksum;
+	private String compressType = null;
+	private int blockSize;
+
+	public HAWQAOSplit()
+	{
+		super();
+	}
+
+	/**
+	 * Constructs a split with host information
+	 * 
+	 * @param file
+	 *            the file name
+	 * @param start
+	 *            the position of the first byte in the file to process
+	 * @param length
+	 *            the number of bytes in the file to process
+	 * @param hosts
+	 *            the list of hosts containing the block, possibly null
+	 */
+	public HAWQAOSplit(Path file, long start, long length, String[] hosts,
+			boolean checksum, String compressType, int blockSize)
+	{
+		super(file, start, length, hosts);
+		this.checksum = checksum;
+		this.compressType = compressType;
+		this.blockSize = blockSize;
+	}
+
+	@Override
+	public String toString()
+	{
+		return super.toString() + "+" + checksum + "+" + compressType + "+"
+				+ blockSize;
+	}
+
+	@Override
+	public void write(DataOutput out) throws IOException
+	{
+		super.write(out);
+		out.writeBoolean(checksum);
+		Text.writeString(out, compressType);
+		out.writeInt(blockSize);
+	}
+
+	@Override
+	public void readFields(DataInput in) throws IOException
+	{
+		super.readFields(in);
+		checksum = in.readBoolean();
+		compressType = Text.readString(in);
+		blockSize = in.readInt();
+	}
+
+	public boolean getChecksum()
+	{
+		return checksum;
+	}
+
+	public String getCompressType()
+	{
+		return compressType;
+	}
+
+	public int getBlockSize()
+	{
+		return blockSize;
+	}
+}


Mime
View raw message