hawq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From h...@apache.org
Subject [3/3] incubator-hawq git commit: HAWQ-86. Fix and re-enable unit test for pxf and external storage
Date Mon, 02 Nov 2015 06:13:59 GMT
HAWQ-86. Fix and re-enable unit test for pxf and external storage


Project: http://git-wip-us.apache.org/repos/asf/incubator-hawq/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-hawq/commit/7c2f615d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-hawq/tree/7c2f615d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-hawq/diff/7c2f615d

Branch: refs/heads/master
Commit: 7c2f615d207f342218b5c77832875cbb03301306
Parents: 6d743f1
Author: Ruilong Huo <rhuo@pivotal.io>
Authored: Mon Nov 2 14:13:00 2015 +0800
Committer: Ruilong Huo <rhuo@pivotal.io>
Committed: Mon Nov 2 14:13:00 2015 +0800

----------------------------------------------------------------------
 src/backend/access/external/test/.gitignore     |   1 +
 src/backend/access/external/test/Makefile       |  66 ++
 src/backend/access/external/test/README.txt     |   6 +
 .../access/external/test/ha_config_mock.c       |  20 +
 .../access/external/test/ha_config_test.c       | 419 +++++++++++++
 ...k_mgr_allocate_fragments_to_datanodes_test.c | 597 ++++++++++++++++++
 ...ork_mgr_distribute_work_2_gp_segments_test.c | 623 +++++++++++++++++++
 ...ork_mgr_do_segment_clustering_by_host_test.c |  99 +++
 .../access/external/test/hd_work_mgr_mock.c     | 100 +++
 .../access/external/test/hd_work_mgr_mock.h     |  84 +++
 .../access/external/test/hd_work_mgr_test.c     |  33 +
 .../access/external/test/pxffilters_test.c      | 556 +++++++++++++++++
 .../access/external/test/pxfheaders_test.c      | 223 +++++++
 .../access/external/test/pxfmasterapi_test.c    | 231 +++++++
 .../access/external/test/pxfuriparser_test.c    | 352 +++++++++++
 .../access/external/test_discard/.gitignore     |   1 -
 .../access/external/test_discard/Makefile       |  66 --
 .../access/external/test_discard/README.txt     |   6 -
 .../external/test_discard/ha_config_mock.c      |  20 -
 .../external/test_discard/ha_config_test.c      | 419 -------------
 ...k_mgr_allocate_fragments_to_datanodes_test.c | 597 ------------------
 ...ork_mgr_distribute_work_2_gp_segments_test.c | 621 ------------------
 ...ork_mgr_do_segment_clustering_by_host_test.c | 175 ------
 .../external/test_discard/hd_work_mgr_mock.c    |  53 --
 .../external/test_discard/hd_work_mgr_mock.h    |  54 --
 .../external/test_discard/hd_work_mgr_test.c    |  34 -
 .../external/test_discard/pxffilters_test.c     | 556 -----------------
 .../external/test_discard/pxfheaders_test.c     | 223 -------
 .../external/test_discard/pxfmasterapi_test.c   | 231 -------
 .../external/test_discard/pxfuriparser_test.c   | 352 -----------
 30 files changed, 3410 insertions(+), 3408 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/7c2f615d/src/backend/access/external/test/.gitignore
----------------------------------------------------------------------
diff --git a/src/backend/access/external/test/.gitignore b/src/backend/access/external/test/.gitignore
new file mode 100644
index 0000000..a8d6b6c
--- /dev/null
+++ b/src/backend/access/external/test/.gitignore
@@ -0,0 +1 @@
+*.t

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/7c2f615d/src/backend/access/external/test/Makefile
----------------------------------------------------------------------
diff --git a/src/backend/access/external/test/Makefile b/src/backend/access/external/test/Makefile
new file mode 100644
index 0000000..e13d107
--- /dev/null
+++ b/src/backend/access/external/test/Makefile
@@ -0,0 +1,66 @@
+subdir=src/backend/access/external
+top_builddir=../../../../..
+
+TARGETS=pxfuriparser hd_work_mgr pxfheaders ha_config pxffilters pxfmasterapi
+
+# Objects from backend, which don't need to be mocked but need to be linked.
+COMMON_REAL_OBJS=\
+    $(top_srcdir)/src/backend/access/hash/hashfunc.o \
+    $(top_srcdir)/src/backend/bootstrap/bootparse.o \
+    $(top_srcdir)/src/backend/lib/stringinfo.o \
+    $(top_srcdir)/src/backend/nodes/bitmapset.o \
+    $(top_srcdir)/src/backend/nodes/equalfuncs.o \
+    $(top_srcdir)/src/backend/nodes/list.o \
+    $(top_srcdir)/src/backend/parser/gram.o \
+    $(top_srcdir)/src/backend/regex/regcomp.o \
+    $(top_srcdir)/src/backend/regex/regerror.o \
+    $(top_srcdir)/src/backend/regex/regexec.o \
+    $(top_srcdir)/src/backend/regex/regfree.o \
+    $(top_srcdir)/src/backend/storage/page/itemptr.o \
+    $(top_srcdir)/src/backend/utils/adt/datum.o \
+    $(top_srcdir)/src/backend/utils/adt/like.o \
+    $(top_srcdir)/src/backend/utils/error/elog.o \
+    $(top_srcdir)/src/backend/utils/hash/dynahash.o \
+    $(top_srcdir)/src/backend/utils/hash/hashfn.o \
+    $(top_srcdir)/src/backend/utils/init/globals.o \
+    $(top_srcdir)/src/backend/utils/mb/mbutils.o \
+    $(top_srcdir)/src/backend/utils/mb/wchar.o \
+    $(top_srcdir)/src/backend/utils/misc/guc.o \
+    $(top_srcdir)/src/port/exec.o \
+    $(top_srcdir)/src/port/path.o \
+    $(top_srcdir)/src/port/pgsleep.o \
+    $(top_srcdir)/src/port/pgstrcasecmp.o \
+    $(top_srcdir)/src/port/qsort.o \
+    $(top_srcdir)/src/port/strlcpy.o \
+    $(top_srcdir)/src/port/thread.o \
+    $(top_srcdir)/src/timezone/localtime.o \
+    $(top_srcdir)/src/timezone/pgtz.o \
+    $(top_srcdir)/src/timezone/strftime.o \
+    $(top_srcdir)/src/backend/utils/mmgr/redzone_handler.o \
+
+# Objects from backend, which don't need to be mocked but need to be linked.
+pxfuriparser_REAL_OBJS=$(COMMON_REAL_OBJS) \
+    $(top_srcdir)/src/backend/utils/adt/formatting.o \
+    $(top_srcdir)/src/backend/nodes/value.o \
+    $(top_srcdir)/src/backend/utils/adt/numutils.o \
+    $(top_srcdir)/src/backend/access/external/pxfutils.o
+hd_work_mgr_REAL_OBJS=$(COMMON_REAL_OBJS) \
+	$(top_srcdir)/src/backend/cdb/cdbutil.o \
+	$(top_srcdir)/src/backend/access/external/pxfuriparser.o \
+	$(top_srcdir)/src/backend/access/external/pxfutils.o
+
+# numutils for pg_ltoa
+pxfheaders_REAL_OBJS=$(COMMON_REAL_OBJS) \
+	$(top_srcdir)/src/backend/utils/adt/numutils.o
+ha_config_REAL_OBJS=$(COMMON_REAL_OBJS) \
+	$(top_srcdir)/src/backend/utils/fmgrtab.o \
+	$(top_srcdir)/src/backend/utils/adt/numutils.o
+pxfmasterapi_REAL_OBJS=$(COMMON_REAL_OBJS) \
+	$(top_srcdir)/src/backend/utils/fmgrtab.o	
+pxffilters_REAL_OBJS=$(COMMON_REAL_OBJS) \
+	$(top_srcdir)/src/backend/optimizer/util/clauses.o \
+	$(top_srcdir)/src/backend/parser/parse_expr.o    
+
+include ../../../../Makefile.mock
+
+MOCK_LIBS += -ljson

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/7c2f615d/src/backend/access/external/test/README.txt
----------------------------------------------------------------------
diff --git a/src/backend/access/external/test/README.txt b/src/backend/access/external/test/README.txt
new file mode 100644
index 0000000..75a03f0
--- /dev/null
+++ b/src/backend/access/external/test/README.txt
@@ -0,0 +1,6 @@
+Directory with the following System Under Test (SUT):
+ - pxfuriparser.c
+ - hd_work_mgr.c
+ - pxfheaders.c
+ - ha_config.c
+ - pxffilters.c

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/7c2f615d/src/backend/access/external/test/ha_config_mock.c
----------------------------------------------------------------------
diff --git a/src/backend/access/external/test/ha_config_mock.c b/src/backend/access/external/test/ha_config_mock.c
new file mode 100644
index 0000000..dcf2b12
--- /dev/null
+++ b/src/backend/access/external/test/ha_config_mock.c
@@ -0,0 +1,20 @@
+#include <stdarg.h>
+#include <stddef.h>
+#include <setjmp.h>
+#include "cmockery.h"
+
+#include "c.h"
+#include "hdfs/hdfs.h"
+
+/* Mock functions for the hdfs.h APIs used by load_nn_ha_config() in ha_config.c */
+
+Namenode * hdfsGetHANamenodes(const char *nameservice, int *size)
+{
+	optional_assignment(size);
+	return (Namenode *)mock();
+}
+
+void hdfsFreeNamenodeInformation(Namenode *namenodes, int size)
+{
+	mock();
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/7c2f615d/src/backend/access/external/test/ha_config_test.c
----------------------------------------------------------------------
diff --git a/src/backend/access/external/test/ha_config_test.c b/src/backend/access/external/test/ha_config_test.c
new file mode 100644
index 0000000..61cf1de
--- /dev/null
+++ b/src/backend/access/external/test/ha_config_test.c
@@ -0,0 +1,419 @@
+#include <stdarg.h>
+#include <stddef.h>
+#include <setjmp.h>
+#include "cmockery.h"
+
+#include "c.h"
+#include "../ha_config.c"
+#include "ha_config_mock.c"
+#include "lib/stringinfo.h"
+#include "utils/builtins.h"
+
+/* Helper functions signature */
+#define format_string_release(p) (pfree(p))
+
+static char *format_string_create(char *f,...);
+static void handle_pg_excp(char *msg, int errcode);
+
+/*
+ * Unitest for GPHD_HA_load_nodes() in ../access/external/ha_config.c
+ * GPHD_HA_load_nodes() discovers the active Namnode from an HA Namenodes pair.
+ * It does this by interacting with the API exposed by hdfs.h, from which it uses 
+ * 2 functions:
+ * a. Namenode * hdfsGetHANamenodes(const char * nameservice, int * size);
+ * b. void hdfsFreeNamenodeInformation(Namenode * namenodes, int size);
+ * This unitest verifies the correct interaction between GPHD_HA_load_nodes() implementation
+ * and the 2 hdfs.h APIs. It looks at the standard flows with expected input configuration
+ * and also at limit cases with corrupted input configuration.
+ * The mock functions for the two(2) hdfs.h APIs are in ha_config_mock.c.
+ */
+
+/*
+ * SUT function: NNHAConf* GPHD_HA_load_nodes(const char *nameservice);
+ * Mock function: Namenode * hdfsGetHANamenodes(const char * nameservice, int * size) 
+ * Negative test: GPHD_HA_load_nodes() receives an unexistent namesrvice
+ */
+void 
+test__GPHD_HA_load_nodes__UnknownNameservice(void **state)
+{
+	/* 
+	 * In case it receives an unknown nameservice string, the real function hdfsGetHANamenodes()
+	 * will return NULL. We instruct our mock function to return NULL. In this way we simulate
+	 * an unknown_service scenario and verify that our SUT function GPHD_HA_load_nodes() handles
+	 * correctly the NULL returned by hdfsGetHANamenodes.
+	 */
+	will_return(hdfsGetHANamenodes, NULL);
+	
+	PG_TRY();
+	{
+		NNHAConf *hac = GPHD_HA_load_nodes("UNKNOWN_SERVICE");
+	}
+	PG_CATCH();
+	{
+		char *msg = "nameservice UNKNOWN_SERVICE not found in client configuration. No HA namenodes provided";
+		handle_pg_excp(msg, ERRCODE_SYNTAX_ERROR);
+		return;
+	}
+	PG_END_TRY();
+	
+	assert_true(false);
+}
+
+/*
+ * SUT function: NNHAConf* GPHD_HA_load_nodes(const char *nameservice);
+ * Mock function: Namenode * hdfsGetHANamenodes(const char * nameservice, int * size) 
+ * Negative test: hdfsGetHANamenodes() returns just one namenode. 
+ * This is not an HA sustainable.
+ */
+void 
+test__GPHD_HA_load_nodes__OneNN(void **state)
+{
+	unsigned int numn = 1;
+	Namenode nns[1];
+	
+	will_return(hdfsGetHANamenodes, nns);
+	will_assign_value(hdfsGetHANamenodes, size, numn);
+	
+
+	PG_TRY();
+	{
+		NNHAConf *hac = GPHD_HA_load_nodes("NAMESERVICE");
+	}
+	PG_CATCH();
+	{
+		char *msg = format_string_create("High availability for nameservice %s was configured with only one node. A high availability scheme requires at least two nodes ",
+										 "NAMESERVICE");
+		handle_pg_excp(msg, ERRCODE_INTERNAL_ERROR);
+		format_string_release(msg); /* if we trip on assert_string_equal we don't free but it doesen't matter because process stops*/
+		return;
+	}
+	PG_END_TRY();
+	
+	assert_true(false);
+	
+}
+										 																							
+/*
+ * SUT function: NNHAConf* GPHD_HA_load_nodes(const char *nameservice);
+ * Mock function: Namenode * hdfsGetHANamenodes(const char * nameservice, int * size) 
+ * Negative test: hdfsGetHANamenodes() returns a Namenode.rpc_address field without ":"
+ * the host:port delimiter.
+ */
+void 
+test__GPHD_HA_load_nodes__RpcDelimMissing(void **state)
+{
+	unsigned int numn = 2;
+	Namenode nns[] = { {"mdw2080", "mdw:50070"}, {"smdw:2080", "smdw:50070"}};	
+	
+	will_return(hdfsGetHANamenodes, nns);
+	will_assign_value(hdfsGetHANamenodes, size, numn);
+		
+	PG_TRY();
+	{
+		NNHAConf *hac = GPHD_HA_load_nodes("NAMESERVICE");
+	}
+	PG_CATCH();
+	{
+		char *msg = "dfs.namenode.rpc-address was set incorrectly in the configuration. ':' missing";
+		handle_pg_excp(msg, ERRCODE_SYNTAX_ERROR);
+		return;
+	}
+	PG_END_TRY();
+	
+	assert_true(false);
+	
+}
+
+/*
+ * SUT function: NNHAConf* GPHD_HA_load_nodes(const char *nameservice)
+ * Mock functions: Namenode * hdfsGetHANamenodes(const char * nameservice, int * size)
+ *                 void port_to_str(char **port, int new_port)
+ * Positive test: port_to_str() assigns pxf_service_port correctly
+ */
+void 
+test__GPHD_HA_load_nodes__PxfServicePortIsAssigned(void **state)
+{
+	unsigned int numn = 2;
+	Namenode nns[] = { {"mdw:2080", "mdw:50070"}, {"smdw:2080", "smdw:50070"}};
+	char strPort[30] = {0};
+	pg_ltoa(pxf_service_port, strPort);
+	
+	will_return(hdfsGetHANamenodes, nns);
+	will_assign_value(hdfsGetHANamenodes, size, numn);
+
+	will_be_called(port_to_str);
+	expect_not_value(port_to_str, port, NULL);
+	expect_value(port_to_str, new_port, pxf_service_port);
+	will_assign_string(port_to_str, port, strPort);
+
+	will_be_called(port_to_str);
+	expect_not_value(port_to_str, port, NULL);
+	expect_value(port_to_str, new_port, pxf_service_port);
+	will_assign_string(port_to_str, port, strPort);
+
+	will_be_called(hdfsFreeNamenodeInformation);
+		
+
+	NNHAConf *hac = GPHD_HA_load_nodes("NAMESERVICE");
+}
+
+/*
+ * SUT function: NNHAConf* GPHD_HA_load_nodes(const char *nameservice)
+ * Mock functions: Namenode * hdfsGetHANamenodes(const char * nameservice, int * size)
+ *                 void port_to_str(char **port, int new_port)
+ * Negative test: hdfsGetHANamenodes() returns a Namenode.http_address field without 
+ * the host - ":port".
+ */
+void 
+test__GPHD_HA_load_nodes__HostMissing(void **state)
+{
+	unsigned int numn = 2;
+	Namenode nns[] = { {":2080", "mdw:50070"}, {"smdw:2080", "smdw:50070"}};
+	char strPort[30] = {0};
+	pg_ltoa(pxf_service_port, strPort);
+	
+	will_return(hdfsGetHANamenodes, nns);
+	will_assign_value(hdfsGetHANamenodes, size, numn);
+	
+	will_be_called(port_to_str);
+	expect_not_value(port_to_str, port, NULL);
+	expect_value(port_to_str, new_port, pxf_service_port);
+	will_assign_string(port_to_str, port, strPort);
+
+	will_be_called(port_to_str);
+	expect_not_value(port_to_str, port, NULL);
+	expect_value(port_to_str, new_port, pxf_service_port);
+	will_assign_string(port_to_str, port, strPort);
+
+	will_be_called(hdfsFreeNamenodeInformation);
+
+	PG_TRY();
+	{
+		NNHAConf *hac = GPHD_HA_load_nodes("NAMESERVICE");
+	}
+	PG_CATCH();
+	{
+		char *msg = "HA Namenode host number 1 is NULL value";
+		handle_pg_excp(msg, ERRCODE_SYNTAX_ERROR);
+		return;
+	}
+	PG_END_TRY();
+}
+
+/*
+ * SUT function: NNHAConf* GPHD_HA_load_nodes(const char *nameservice)
+ * Mock functions: Namenode * hdfsGetHANamenodes(const char * nameservice, int * size)
+ *                 void port_to_str(char **port, int new_port)
+ * Negative test: port_to_str() does not set the port
+ * the port - "host:".
+ */
+void 
+test__GPHD_HA_load_nodes__PortMissing(void **state)
+{
+	unsigned int numn = 2;
+	Namenode nns[] = { {"mdw:", "mdw:50070"}, {"smdw:2080", "smdw:50070"}};	
+	
+	will_return(hdfsGetHANamenodes, nns);
+	will_assign_value(hdfsGetHANamenodes, size, numn);
+
+	will_be_called(port_to_str);
+	expect_not_value(port_to_str, port, NULL);
+	expect_value(port_to_str, new_port, pxf_service_port);
+
+	will_be_called(port_to_str);
+	expect_not_value(port_to_str, port, NULL);
+	expect_value(port_to_str, new_port, pxf_service_port);
+
+	will_be_called(hdfsFreeNamenodeInformation);
+	
+	PG_TRY();
+	{
+		NNHAConf *hac = GPHD_HA_load_nodes("NAMESERVICE");
+	}
+	PG_CATCH();
+	{
+		char *msg = "HA Namenode RPC port number 1 is NULL value";
+		handle_pg_excp(msg, ERRCODE_SYNTAX_ERROR);
+		return;
+	}
+	PG_END_TRY();
+}
+
+/*
+ * SUT function: NNHAConf* GPHD_HA_load_nodes(const char *nameservice)
+ * Mock functions: Namenode * hdfsGetHANamenodes(const char * nameservice, int * size)
+ *                 void port_to_str(char **port, int new_port)
+ * Negative test: port_to_str() returns a port outside the valid range
+ *  - a number higher than 65535
+ */
+void 
+test__GPHD_HA_load_nodes__PortIsInvalidNumber(void **state)
+{
+	unsigned int numn = 2;
+	Namenode nns[] = { {"mdw:2080", "mdw:65550"}, {"smdw:2080", "smdw:50070"}};	
+	
+	will_return(hdfsGetHANamenodes, nns);
+	will_assign_value(hdfsGetHANamenodes, size, numn);
+
+	will_be_called(port_to_str);
+	expect_not_value(port_to_str, port, NULL);
+	expect_value(port_to_str, new_port, pxf_service_port);
+	will_assign_string(port_to_str, port, "65550");
+
+	will_be_called(port_to_str);
+	expect_not_value(port_to_str, port, NULL);
+	expect_value(port_to_str, new_port, pxf_service_port);
+	will_assign_string(port_to_str, port, "65550");
+
+	will_be_called(hdfsFreeNamenodeInformation);
+	
+	PG_TRY();
+	{
+		NNHAConf *hac = GPHD_HA_load_nodes("NAMESERVICE");
+	}
+	PG_CATCH();
+	{
+		char *msg = "Invalid port <65550> detected in nameservice configuration";				
+		handle_pg_excp(msg, ERRCODE_SYNTAX_ERROR);		
+		return;
+	}
+	PG_END_TRY();
+}
+
+/*
+ * SUT function: NNHAConf* GPHD_HA_load_nodes(const char *nameservice)
+ * Mock functions: Namenode * hdfsGetHANamenodes(const char * nameservice, int * size)
+ *                 void port_to_str(char **port, int new_port)
+ * Negative test: port_to_str() returns a port that is not a number
+ */
+void 
+test__GPHD_HA_load_nodes__PortIsNotNumber_TakeOne(void **state)
+{
+	NNHAConf *hac;
+	unsigned int numn = 2;
+	Namenode nns[] = { {"mdw:2080", "mdw:50070"}, {"smdw:2080", "smdw:50070"}};
+	
+	will_return(hdfsGetHANamenodes, nns);
+	will_assign_value(hdfsGetHANamenodes, size, numn);
+
+	will_be_called(port_to_str);
+	expect_not_value(port_to_str, port, NULL);
+	expect_value(port_to_str, new_port, pxf_service_port);
+	will_assign_string(port_to_str, port, "melon");
+
+	will_be_called(port_to_str);
+	expect_not_value(port_to_str, port, NULL);
+	expect_value(port_to_str, new_port, pxf_service_port);
+
+	will_be_called(hdfsFreeNamenodeInformation);
+	
+	PG_TRY();
+	{
+		hac = GPHD_HA_load_nodes("NAMESERVICE");
+	}
+	PG_CATCH();
+	{
+		char *msg = "Invalid port <melon> detected in nameservice configuration";				
+		handle_pg_excp(msg, ERRCODE_SYNTAX_ERROR);		
+		return;
+	}
+	PG_END_TRY();
+}
+
+/*
+ * SUT function: NNHAConf* GPHD_HA_load_nodes(const char *nameservice)
+ * Mock functions: Namenode * hdfsGetHANamenodes(const char * nameservice, int * size)
+ *                 void port_to_str(char **port, int new_port)
+ * Negative test: port_to_str() returns a port that is not a number
+ */
+void 
+test__GPHD_HA_load_nodes__PortIsNotNumber_TakeTwo(void **state)
+{
+	NNHAConf *hac;
+	unsigned int numn = 2;
+	Namenode nns[] = { {"mdw:2080", "mdw:50070"}, {"smdw:2080", "smdw:50070"}};
+	
+	will_return(hdfsGetHANamenodes, nns);
+	will_assign_value(hdfsGetHANamenodes, size, numn);
+
+	will_be_called(port_to_str);
+	expect_not_value(port_to_str, port, NULL);
+	expect_value(port_to_str, new_port, pxf_service_port);
+	will_assign_string(port_to_str, port, "100ab");
+
+	will_be_called(port_to_str);
+	expect_not_value(port_to_str, port, NULL);
+	expect_value(port_to_str, new_port, pxf_service_port);
+
+	will_be_called(hdfsFreeNamenodeInformation);
+	
+	PG_TRY();
+	{
+		hac = GPHD_HA_load_nodes("NAMESERVICE");
+	}
+	PG_CATCH();
+	{
+		char *msg = "Invalid port <100ab> detected in nameservice configuration";				
+		handle_pg_excp(msg, ERRCODE_SYNTAX_ERROR);		
+		return;
+	}
+	PG_END_TRY();
+}
+
+int 
+main(int argc, char *argv[]) 
+{
+	cmockery_parse_arguments(argc, argv);
+
+	const UnitTest tests[] = {
+		    unit_test(test__GPHD_HA_load_nodes__UnknownNameservice),
+            unit_test(test__GPHD_HA_load_nodes__OneNN),
+		    unit_test(test__GPHD_HA_load_nodes__RpcDelimMissing),
+		    unit_test(test__GPHD_HA_load_nodes__PxfServicePortIsAssigned),
+		    unit_test(test__GPHD_HA_load_nodes__HostMissing),
+		    unit_test(test__GPHD_HA_load_nodes__PortMissing),
+		    unit_test(test__GPHD_HA_load_nodes__PortIsInvalidNumber),
+		    unit_test(test__GPHD_HA_load_nodes__PortIsNotNumber_TakeOne),
+		    unit_test(test__GPHD_HA_load_nodes__PortIsNotNumber_TakeTwo)
+	};
+	return run_tests(tests);
+}
+
+/*
+ * Helper function to format strings that need to be passed to assert macros
+ */
+static char*
+format_string_create(char *f,...)
+{
+	StringInfoData s;
+	va_list vl;
+	
+	initStringInfo(&s);
+	
+	va_start(vl,f);
+	appendStringInfoVA(&s, f, vl);
+	va_end(vl);
+	
+	return s.data;
+}
+
+/*
+ * Encapsulates exception unpackaging
+ */
+static void
+handle_pg_excp(char *msg, int errcode)
+{
+	CurrentMemoryContext = 1;
+	ErrorData *edata = CopyErrorData();
+	
+	assert_true(edata->sqlerrcode == errcode);
+	assert_true(edata->elevel == ERROR);
+	assert_string_equal(edata->message, msg);
+	
+	/* Clean the internal error data stack. Otherwise errordata_stack_depth in elog.c,
+	 * keeps growing from test to test with each ereport we issue in our SUT function
+	 * until we reach errordata_stack_depth >= ERRORDATA_STACK_SIZE and our tests
+	 * start failing
+	 */
+	elog_dismiss(INFO);
+}

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/7c2f615d/src/backend/access/external/test/hd_work_mgr_allocate_fragments_to_datanodes_test.c
----------------------------------------------------------------------
diff --git a/src/backend/access/external/test/hd_work_mgr_allocate_fragments_to_datanodes_test.c b/src/backend/access/external/test/hd_work_mgr_allocate_fragments_to_datanodes_test.c
new file mode 100644
index 0000000..fb42ee1
--- /dev/null
+++ b/src/backend/access/external/test/hd_work_mgr_allocate_fragments_to_datanodes_test.c
@@ -0,0 +1,597 @@
+#include <stdarg.h>
+#include <stddef.h>
+#include <setjmp.h>
+#include "cmockery.h"
+
+#include "c.h"
+#include "hd_work_mgr_mock.h"
+
+
+/*
+ * check that datanode dn has the expected
+ * ip, port, dn blocks list, and number of read & residing fragments
+ */
+void check_dn_load(DatanodeProcessingLoad* dn,
+				   const char* expected_hostip,
+				   int expected_port,
+				   List* expected_blocks,
+				   int expected_read_fragments,
+				   int expected_residing_fragments)
+{
+
+	ListCell* cell = NULL;
+	int blocks_number = 0;
+
+	assert_string_equal(dn->dataNodeIp, expected_hostip);
+	assert_int_equal(dn->port, expected_port);
+	assert_int_equal(dn->num_fragments_read, expected_read_fragments);
+	assert_int_equal(dn->num_fragments_residing, expected_residing_fragments);
+
+	assert_int_equal(list_length(expected_blocks), list_length(dn->datanodeBlocks));
+	blocks_number = list_length(expected_blocks);
+	for (int i = 0; i < blocks_number; ++i)
+	{
+		AllocatedDataFragment* block =
+				(AllocatedDataFragment*)lfirst(list_nth_cell(dn->datanodeBlocks, i));
+		AllocatedDataFragment* expected_block =
+				(AllocatedDataFragment*)lfirst(list_nth_cell(expected_blocks, i));
+		assert_string_equal(block->host, expected_block->host);
+		assert_int_equal(block->rest_port, expected_block->rest_port);
+		assert_string_equal(block->source_name, expected_block->source_name);
+		assert_string_equal(block->fragment_md, expected_block->fragment_md);
+		assert_string_equal(block->user_data, expected_block->user_data);
+		assert_int_equal(block->index, expected_block->index);
+	}
+
+}
+
+/*
+ * Test get_dn_processing_load:
+ * starting with an empty list, add fragment location
+ * and check that the returned element equals the given fragment
+ * and that if the location is new a new element was added to the list.
+ */
+void 
+test__get_dn_processing_load(void **state)
+{
+	List* allDNProcessingLoads = NIL;
+	ListCell* cell = NULL;
+	FragmentHost *fragment_loc = NULL;
+	DatanodeProcessingLoad* dn_found = NULL;
+
+	char* ip_array[] =
+	{
+		"1.2.3.1",
+		"1.2.3.2",
+		"1.2.3.3",
+		"1.2.3.1",
+		"1.2.3.1",
+		"1.2.3.3",
+		"1.2.3.1"
+	};
+	int port_array[] =
+	{
+		100,
+		100,
+		100,
+		100,
+		100,
+		100,
+		200
+	};
+	int expected_list_size[] =
+	{
+		1, 2, 3, 3, 3, 3, 4
+	};
+	int array_size = 7;
+	/* sanity */
+	assert_int_equal(array_size, (sizeof(ip_array) / sizeof(ip_array[0])));
+	assert_int_equal(array_size, (sizeof(port_array) / sizeof(port_array[0])));
+	assert_int_equal(array_size, (sizeof(expected_list_size) / sizeof(expected_list_size[0])));
+
+	fragment_loc = (FragmentHost*) palloc0(sizeof(FragmentHost));
+
+	for (int i = 0; i < array_size; ++i)
+	{
+		dn_found = NULL;
+		fragment_loc->ip = ip_array[i];
+		fragment_loc->rest_port = port_array[i];
+
+		dn_found = get_dn_processing_load(&allDNProcessingLoads, fragment_loc);
+
+		assert_true(dn_found != NULL);
+		assert_int_equal(list_length(allDNProcessingLoads), expected_list_size[i]);
+
+		check_dn_load(dn_found, ip_array[i], port_array[i], NIL, 0, 0);
+	}
+
+	pfree(fragment_loc);
+	/* free allDNProcessingLoads */
+	foreach(cell, allDNProcessingLoads)
+	{
+		DatanodeProcessingLoad* cell_data = (DatanodeProcessingLoad*)lfirst(cell);
+		pfree(cell_data->dataNodeIp);
+	}
+	list_free(allDNProcessingLoads);
+}
+
+/*
+ * create, verify and free AllocatedDataFragment
+ */
+void check_create_allocated_fragment(DataFragment* fragment, bool has_user_data)
+{
+	AllocatedDataFragment* allocated = create_allocated_fragment(fragment);
+
+	assert_true(allocated != NULL);
+	assert_int_equal(allocated->index, fragment->index);
+	assert_string_equal(allocated->source_name, fragment->source_name);
+	assert_string_equal(allocated->fragment_md, fragment->fragment_md);
+	if (has_user_data)
+		assert_string_equal(allocated->user_data, fragment->user_data);
+	else
+		assert_true(allocated->user_data == NULL);
+	assert_true(allocated->host == NULL);
+	assert_int_equal(allocated->rest_port, 0);
+
+	if (allocated->source_name)
+		pfree(allocated->source_name);
+	if (allocated->user_data)
+		pfree(allocated->user_data);
+	pfree(allocated);
+}
+
+
+/*
+ * Test create_allocated_fragment without user_data.
+ */
+void
+test__create_allocated_fragment__NoUserData(void **state)
+{
+	AllocatedDataFragment* allocated = NULL;
+
+	DataFragment* fragment = (DataFragment*) palloc0(sizeof(DataFragment));
+	fragment->index = 13;
+	fragment->source_name = "source name!!!";
+	fragment->user_data = NULL;
+	fragment->fragment_md = "METADATA";
+
+	check_create_allocated_fragment(fragment, false);
+
+	pfree(fragment);
+}
+
+/*
+ * Test create_allocated_fragment without user_data.
+ */
+void
+test__create_allocated_fragment__WithUserData(void **state)
+{
+	AllocatedDataFragment* allocated = NULL;
+
+	DataFragment* fragment = (DataFragment*) palloc0(sizeof(DataFragment));
+	fragment->index = 13;
+	fragment->source_name = "source name!!!";
+	fragment->user_data = "Wish You Were Here";
+	fragment->fragment_md = "METADATA";
+
+	check_create_allocated_fragment(fragment, true);
+
+	pfree(fragment);
+}
+
+/*
+ * Creates a list of DataFragment for one file ("file.txt").
+ * The important thing here is the fragments' location. It is determined by the parameters:
+ * replication_factor - number of copies of each fragment on the different hosts.
+ * number_of_hosts - number of hosts, so that the IP pool we use is 1.2.3.{1-number_of_hosts}
+ * number_of_fragments - number of fragments in the file.
+ *
+ * Each fragment will have <replication_factor> hosts,
+ * starting from IP 1.2.3.<fragment_number> to IP 1.2.3.<fragment_number + replication_factor> modulo <number_of_hosts>.
+ * That way there is some overlapping between the hosts of each fragment.
+ */
+List* build_data_fragments_list(int number_of_fragments, int number_of_hosts, int replication_factor)
+{
+	List* fragments_list = NIL;
+	StringInfoData string_info;
+	initStringInfo(&string_info);
+
+	for (int i = 0; i < number_of_fragments; ++i)
+	{
+		DataFragment* fragment = (DataFragment*) palloc0(sizeof(DataFragment));
+
+		fragment->index = i;
+		fragment->source_name = pstrdup("file.txt");
+
+		for (int j = 0; j < replication_factor; ++j)
+		{
+			FragmentHost* fhost = (FragmentHost*)palloc0(sizeof(FragmentHost));
+			appendStringInfo(&string_info, "1.2.3.%d", ((j + i) % number_of_hosts) + 1);
+			fhost->ip = pstrdup(string_info.data);
+			resetStringInfo(&string_info);
+			fragment->replicas = lappend(fragment->replicas, fhost);
+		}
+		assert_int_equal(list_length(fragment->replicas), replication_factor);
+		appendStringInfo(&string_info, "metadata %d", i);
+		fragment->fragment_md = pstrdup(string_info.data);
+		resetStringInfo(&string_info);
+		appendStringInfo(&string_info, "user data %d", i);
+		fragment->user_data = pstrdup(string_info.data);
+		resetStringInfo(&string_info);
+		fragments_list = lappend(fragments_list, fragment);
+	}
+
+	pfree(string_info.data);
+	return fragments_list;
+}
+
+/*
+ * Returns a list of AllocatedDataFragment.
+ */
+List* build_blank_blocks_list(int list_size)
+{
+	List* blocks_list = NIL;
+	for (int i = 0; i < list_size; ++i)
+	{
+		AllocatedDataFragment* block = (AllocatedDataFragment*) palloc0(sizeof(AllocatedDataFragment));
+		blocks_list = lappend(blocks_list, block);
+	}
+	return blocks_list;
+}
+
+/*
+ * Tests allocate_fragments_to_datanodes with 4 fragments over 10 hosts, with 3 replicates each,
+ * so that they are distributed as follows:
+ * Fragment 0 : on hosts 1.2.3.1, 1.2.3.2, 1.2.3.3.
+ * Fragment 1 : on hosts 1.2.3.2, 1.2.3.3, 1.2.3.4.
+ * Fragment 2 : on hosts 1.2.3.3, 1.2.3.4, 1.2.3.5.
+ * Fragment 3 : on hosts 1.2.3.4, 1.2.3.5, 1.2.3.6.
+ *
+ * The expected distribution is as follows:
+ * Host 1.2.3.1: fragment 0.
+ * Host 1.2.3.2: fragment 1.
+ * Host 1.2.3.3: fragment 2.
+ * Host 1.2.3.4: fragment 3.
+ * Hosts 1.2.3.5, 1.2.3.6: no fragments.
+ */
+void
+test__allocate_fragments_to_datanodes__4Fragments10Hosts3Replicates(void **state)
+{
+	List* allDNProcessingLoads = NIL;
+	List* data_fragment_list = NIL;
+	List* blocks_list = NIL;
+	ListCell* cell = NULL;
+	DatanodeProcessingLoad* datanode_load = NULL;
+	AllocatedDataFragment* allocated = NULL;
+
+	data_fragment_list = build_data_fragments_list(4, 10, 3);
+	assert_int_equal(4, list_length(data_fragment_list));
+
+	allDNProcessingLoads = allocate_fragments_to_datanodes(data_fragment_list);
+
+	assert_true(allDNProcessingLoads != NULL);
+	assert_int_equal(6, list_length(allDNProcessingLoads));
+
+	datanode_load = (DatanodeProcessingLoad*)lfirst(list_nth_cell(allDNProcessingLoads, 0));
+	blocks_list = build_blank_blocks_list(1);
+	allocated = lfirst(list_nth_cell(blocks_list, 0));
+	allocated->host = "1.2.3.1";
+	allocated->index = 0;
+	allocated->rest_port = 0;
+	allocated->source_name = "file.txt";
+	allocated->fragment_md = "metadata 0";
+	allocated->user_data = "user data 0";
+	check_dn_load(datanode_load,
+			      "1.2.3.1", 0, blocks_list, 1, 1);
+
+	datanode_load = (DatanodeProcessingLoad*)lfirst(list_nth_cell(allDNProcessingLoads, 1));
+	allocated->host = "1.2.3.2";
+	allocated->index = 1;
+	allocated->fragment_md = "metadata 1";
+	allocated->user_data = "user data 1";
+	check_dn_load(datanode_load,
+				  "1.2.3.2", 0, blocks_list, 1, 2);
+
+	datanode_load = (DatanodeProcessingLoad*)lfirst(list_nth_cell(allDNProcessingLoads, 2));
+	allocated->host = "1.2.3.3";
+	allocated->index = 2;
+	allocated->fragment_md = "metadata 2";
+	allocated->user_data = "user data 2";
+	check_dn_load(datanode_load,
+				  "1.2.3.3", 0, blocks_list, 1, 3);
+
+	datanode_load = (DatanodeProcessingLoad*)lfirst(list_nth_cell(allDNProcessingLoads, 3));
+	allocated->host = "1.2.3.4";
+	allocated->index = 3;
+	allocated->fragment_md = "metadata 3";
+	allocated->user_data = "user data 3";
+	check_dn_load(datanode_load,
+				  "1.2.3.4", 0, blocks_list, 1, 3);
+
+	datanode_load = (DatanodeProcessingLoad*)lfirst(list_nth_cell(allDNProcessingLoads, 4));
+	check_dn_load(datanode_load,
+				  "1.2.3.5", 0, NIL, 0, 2);
+	datanode_load = (DatanodeProcessingLoad*)lfirst(list_nth_cell(allDNProcessingLoads, 5));
+	check_dn_load(datanode_load,
+				  "1.2.3.6", 0, NIL, 0, 1);
+
+	/* cleanup */
+	list_free(blocks_list);
+	foreach (cell, allDNProcessingLoads)
+	{
+		DatanodeProcessingLoad* dn = (DatanodeProcessingLoad*)lfirst(cell);
+		free_allocated_frags(dn->datanodeBlocks);
+	}
+	list_free(allDNProcessingLoads);
+}
+
+/*
+ * Tests allocate_fragments_to_datanodes with 4 fragments over 3 hosts, with 2 replicates each,
+ * so that they are distributed as follows:
+ * Fragment 0 : on hosts 1.2.3.1, 1.2.3.2.
+ * Fragment 1 : on hosts 1.2.3.2, 1.2.3.3.
+ * Fragment 2 : on hosts 1.2.3.3, 1.2.3.1.
+ * Fragment 3 : on hosts 1.2.3.1, 1.2.3.2.
+ *
+ * The expected distribution is as follows:
+ * Host 1.2.3.1: fragments 0, 3.
+ * Host 1.2.3.2: fragment 1.
+ * Host 1.2.3.3: fragment 2.
+ */
+void
+test__allocate_fragments_to_datanodes__4Fragments3Hosts2Replicates(void **state)
+{
+	List* allDNProcessingLoads = NIL;
+	List* data_fragment_list = NIL;
+	List* blocks_list = NIL;
+	ListCell* cell = NULL;
+	DatanodeProcessingLoad* datanode_load = NULL;
+	AllocatedDataFragment* allocated = NULL;
+
+	data_fragment_list = build_data_fragments_list(4, 3, 2);
+	assert_int_equal(4, list_length(data_fragment_list));
+
+	allDNProcessingLoads = allocate_fragments_to_datanodes(data_fragment_list);
+
+	assert_true(allDNProcessingLoads != NULL);
+	assert_int_equal(3, list_length(allDNProcessingLoads));
+
+	datanode_load = (DatanodeProcessingLoad*)lfirst(list_nth_cell(allDNProcessingLoads, 0));
+	blocks_list = build_blank_blocks_list(2);
+	allocated = lfirst(list_nth_cell(blocks_list, 0));
+	allocated->host = "1.2.3.1";
+	allocated->index = 0;
+	allocated->rest_port = 0;
+	allocated->source_name = "file.txt";
+	allocated->fragment_md = "metadata 0";
+	allocated->user_data = "user data 0";
+	allocated = lfirst(list_nth_cell(blocks_list, 1));
+	allocated->host = "1.2.3.1";
+	allocated->index = 3;
+	allocated->rest_port = 0;
+	allocated->source_name = "file.txt";
+	allocated->fragment_md = "metadata 3";
+	allocated->user_data = "user data 3";
+	check_dn_load(datanode_load,
+			      "1.2.3.1", 0, blocks_list, 2, 3);
+	list_free(blocks_list);
+
+	datanode_load = (DatanodeProcessingLoad*)lfirst(list_nth_cell(allDNProcessingLoads, 1));
+	blocks_list = build_blank_blocks_list(1);
+	allocated = lfirst(list_nth_cell(blocks_list, 0));
+	allocated->host = "1.2.3.2";
+	allocated->index = 1;
+	allocated->rest_port = 0;
+	allocated->source_name = "file.txt";
+	allocated->fragment_md = "metadata 1";
+	allocated->user_data = "user data 1";
+	check_dn_load(datanode_load,
+				  "1.2.3.2", 0, blocks_list, 1, 3);
+
+	datanode_load = (DatanodeProcessingLoad*)lfirst(list_nth_cell(allDNProcessingLoads, 2));
+	allocated->host = "1.2.3.3";
+	allocated->index = 2;
+	allocated->fragment_md = "metadata 2";
+	allocated->user_data = "user data 2";
+	check_dn_load(datanode_load,
+				  "1.2.3.3", 0, blocks_list, 1, 2);
+
+	/* cleanup */
+	list_free(blocks_list);
+	foreach (cell, allDNProcessingLoads)
+	{
+		DatanodeProcessingLoad* dn = (DatanodeProcessingLoad*)lfirst(cell);
+		free_allocated_frags(dn->datanodeBlocks);
+	}
+	list_free(allDNProcessingLoads);
+}
+
+/*
+ * Tests allocate_fragments_to_datanodes with 4 fragments over 3 hosts, with 1 replicates each,
+ * so that they are distributed as follows:
+ * Fragment 0 : on hosts 1.2.3.1.
+ * Fragment 1 : on hosts 1.2.3.2.
+ * Fragment 2 : on hosts 1.2.3.3.
+ * Fragment 3 : on hosts 1.2.3.1.
+ *
+ * The expected distribution is as follows:
+ * Host 1.2.3.1: fragments 0, 3.
+ * Host 1.2.3.2: fragment 1.
+ * Host 1.2.3.3: fragment 2.
+ */
+void
+test__allocate_fragments_to_datanodes__4Fragments3Hosts1Replicates(void **state)
+{
+	List* allDNProcessingLoads = NIL;
+	List* data_fragment_list = NIL;
+	List* blocks_list = NIL;
+	ListCell* cell = NULL;
+	DatanodeProcessingLoad* datanode_load = NULL;
+	AllocatedDataFragment* allocated = NULL;
+
+	data_fragment_list = build_data_fragments_list(4, 3, 1);
+	assert_int_equal(4, list_length(data_fragment_list));
+
+	allDNProcessingLoads = allocate_fragments_to_datanodes(data_fragment_list);
+
+	assert_true(allDNProcessingLoads != NULL);
+	assert_int_equal(3, list_length(allDNProcessingLoads));
+
+	datanode_load = (DatanodeProcessingLoad*)lfirst(list_nth_cell(allDNProcessingLoads, 0));
+	blocks_list = build_blank_blocks_list(2);
+	allocated = lfirst(list_nth_cell(blocks_list, 0));
+	allocated->host = "1.2.3.1";
+	allocated->index = 0;
+	allocated->rest_port = 0;
+	allocated->source_name = "file.txt";
+	allocated->fragment_md = "metadata 0";
+	allocated->user_data = "user data 0";
+	allocated = lfirst(list_nth_cell(blocks_list, 1));
+	allocated->host = "1.2.3.1";
+	allocated->index = 3;
+	allocated->rest_port = 0;
+	allocated->source_name = "file.txt";
+	allocated->fragment_md = "metadata 3";
+	allocated->user_data = "user data 3";
+	check_dn_load(datanode_load,
+			      "1.2.3.1", 0, blocks_list, 2, 2);
+	list_free(blocks_list);
+
+	datanode_load = (DatanodeProcessingLoad*)lfirst(list_nth_cell(allDNProcessingLoads, 1));
+	blocks_list = build_blank_blocks_list(1);
+	allocated = lfirst(list_nth_cell(blocks_list, 0));
+	allocated->host = "1.2.3.2";
+	allocated->index = 1;
+	allocated->rest_port = 0;
+	allocated->source_name = "file.txt";
+	allocated->fragment_md = "metadata 1";
+	allocated->user_data = "user data 1";
+	check_dn_load(datanode_load,
+				  "1.2.3.2", 0, blocks_list, 1, 1);
+
+	datanode_load = (DatanodeProcessingLoad*)lfirst(list_nth_cell(allDNProcessingLoads, 2));
+	allocated->host = "1.2.3.3";
+	allocated->index = 2;
+	allocated->fragment_md = "metadata 2";
+	allocated->user_data = "user data 2";
+	check_dn_load(datanode_load,
+				  "1.2.3.3", 0, blocks_list, 1, 1);
+
+	/* cleanup */
+	list_free(blocks_list);
+	foreach (cell, allDNProcessingLoads)
+	{
+		DatanodeProcessingLoad* dn = (DatanodeProcessingLoad*)lfirst(cell);
+		free_allocated_frags(dn->datanodeBlocks);
+	}
+	list_free(allDNProcessingLoads);
+}
+
+/*
+ * Tests allocate_fragments_to_datanodes with 7 fragments over 10 hosts, with 1 replicates each,
+ * so that they are distributed as follows:
+ * Fragment 0 : on hosts 1.2.3.1.
+ * Fragment 1 : on hosts 1.2.3.2.
+ * Fragment 2 : on hosts 1.2.3.3.
+ * Fragment 3 : on hosts 1.2.3.4.
+ * Fragment 4 : on hosts 1.2.3.5.
+ * Fragment 5 : on hosts 1.2.3.6.
+ * Fragment 6 : on hosts 1.2.3.7.
+ *
+ * The expected distribution is as follows:
+ * Host 1.2.3.1: fragment 0.
+ * Host 1.2.3.2: fragment 1.
+ * Host 1.2.3.3: fragment 2.
+ * Host 1.2.3.4: fragment 3.
+ * Host 1.2.3.5: fragment 4.
+ * Host 1.2.3.6: fragment 5.
+ * Host 1.2.3.7: fragment 6.
+ * Hosts 1.2.3.8, 1.2.3.9, 1.2.3.10: no fragments.
+ */
+void
+test__allocate_fragments_to_datanodes__7Fragments10Hosts1Replicates(void **state)
+{
+	List* allDNProcessingLoads = NIL;
+	List* data_fragment_list = NIL;
+	List* blocks_list = NIL;
+	ListCell* cell = NULL;
+	DatanodeProcessingLoad* datanode_load = NULL;
+	AllocatedDataFragment* allocated = NULL;
+
+	data_fragment_list = build_data_fragments_list(7, 10, 1);
+	assert_int_equal(7, list_length(data_fragment_list));
+
+	allDNProcessingLoads = allocate_fragments_to_datanodes(data_fragment_list);
+
+	assert_true(allDNProcessingLoads != NULL);
+	assert_int_equal(7, list_length(allDNProcessingLoads));
+
+	datanode_load = (DatanodeProcessingLoad*)lfirst(list_nth_cell(allDNProcessingLoads, 0));
+	blocks_list = build_blank_blocks_list(1);
+	allocated = lfirst(list_nth_cell(blocks_list, 0));
+	allocated->host = "1.2.3.1";
+	allocated->index = 0;
+	allocated->rest_port = 0;
+	allocated->source_name = "file.txt";
+	allocated->fragment_md = "metadata 0";
+	allocated->user_data = "user data 0";
+	check_dn_load(datanode_load,
+			      "1.2.3.1", 0, blocks_list, 1, 1);
+
+	datanode_load = (DatanodeProcessingLoad*)lfirst(list_nth_cell(allDNProcessingLoads, 1));
+	allocated->host = "1.2.3.2";
+	allocated->index = 1;
+	allocated->fragment_md = "metadata 1";
+	allocated->user_data = "user data 1";
+	check_dn_load(datanode_load,
+				  "1.2.3.2", 0, blocks_list, 1, 1);
+
+	datanode_load = (DatanodeProcessingLoad*)lfirst(list_nth_cell(allDNProcessingLoads, 2));
+	allocated->host = "1.2.3.3";
+	allocated->index = 2;
+	allocated->fragment_md = "metadata 2";
+	allocated->user_data = "user data 2";
+	check_dn_load(datanode_load,
+			"1.2.3.3", 0, blocks_list, 1, 1);
+
+	datanode_load = (DatanodeProcessingLoad*)lfirst(list_nth_cell(allDNProcessingLoads, 3));
+	allocated->host = "1.2.3.4";
+	allocated->index = 3;
+	allocated->fragment_md = "metadata 3";
+	allocated->user_data = "user data 3";
+	check_dn_load(datanode_load,
+			"1.2.3.4", 0, blocks_list, 1, 1);
+
+	datanode_load = (DatanodeProcessingLoad*)lfirst(list_nth_cell(allDNProcessingLoads, 4));
+	allocated->host = "1.2.3.5";
+	allocated->index = 4;
+	allocated->fragment_md = "metadata 4";
+	allocated->user_data = "user data 4";
+	check_dn_load(datanode_load,
+			"1.2.3.5", 0, blocks_list, 1, 1);
+
+	datanode_load = (DatanodeProcessingLoad*)lfirst(list_nth_cell(allDNProcessingLoads, 5));
+	allocated->host = "1.2.3.6";
+	allocated->index = 5;
+	allocated->fragment_md = "metadata 5";
+	allocated->user_data = "user data 5";
+	check_dn_load(datanode_load,
+			"1.2.3.6", 0, blocks_list, 1, 1);
+
+	datanode_load = (DatanodeProcessingLoad*)lfirst(list_nth_cell(allDNProcessingLoads, 6));
+	allocated->host = "1.2.3.7";
+	allocated->index = 6;
+	allocated->fragment_md = "metadata 6";
+	allocated->user_data = "user data 6";
+	check_dn_load(datanode_load,
+			"1.2.3.7", 0, blocks_list, 1, 1);
+
+	/* cleanup */
+	list_free(blocks_list);
+	foreach (cell, allDNProcessingLoads)
+	{
+		DatanodeProcessingLoad* dn = (DatanodeProcessingLoad*)lfirst(cell);
+		free_allocated_frags(dn->datanodeBlocks);
+	}
+	list_free(allDNProcessingLoads);
+}

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/7c2f615d/src/backend/access/external/test/hd_work_mgr_distribute_work_2_gp_segments_test.c
----------------------------------------------------------------------
diff --git a/src/backend/access/external/test/hd_work_mgr_distribute_work_2_gp_segments_test.c b/src/backend/access/external/test/hd_work_mgr_distribute_work_2_gp_segments_test.c
new file mode 100644
index 0000000..2b940ff
--- /dev/null
+++ b/src/backend/access/external/test/hd_work_mgr_distribute_work_2_gp_segments_test.c
@@ -0,0 +1,623 @@
+#include <stdarg.h>
+#include <stddef.h>
+#include <setjmp.h>
+#include "cmockery.h"
+
+#include "c.h"
+#include "hd_work_mgr_mock.h"
+
+
+/*
+ * In test__distribute_work_2_gp_segments we are testing the function distribute_work_2_gp_segments().
+ * distribute_work_2_gp_segments() implements the algorithm that allocates the fragments of an external   
+ * data source to the Hawq segments for processing.
+ * This unitest verifies the algorithm output, and ensures the following algorithm behaviour:
+ * a. The number of fragments allocated is equal to the input number of fragments
+ * b. distribution of  work between segments: if there are two segments required and more than 
+ *    one host, each segment will be on a different host
+ * c. percent of local datanodes out of all datanodes
+ * d. percent of local fragments out of all fragments
+ * e. number of actual working segments is bigger than half of the initial working segments
+ */
+
+static void print_allocated_fragments(List **allocated_fragments, int total_segs);
+static char* print_one_allocated_data_fragment(AllocatedDataFragment *frag, int seg_index);
+static char* find_segment_ip_by_index(int seg_index);
+static char** create_cluster(int num_hosts);
+static char** clean_cluster(char** cluster, int num_hosts);
+static char** create_array_of_segs(char **cluster, int num_hosts, int num_segments_on_host);
+static void clean_array_of_segs(char **array_of_segs, int number_of_segments);
+static bool* create_array_of_primaries(int number_of_segments);
+static char** print_cluster(char** cluster, int num_hosts);
+static void print_segments_list();
+void clean_allocated_fragments(List **allocated_fragments, int total_segs);
+static void validate_total_fragments_allocated(List **allocated_fragments, int total_segs, int input_total_fragments);
+static void validate_max_load_per_segment(List **allocated_fragments, int total_segs, int working_segs, int input_total_fragments);
+static int calc_load_per_segment(int input_total_fragments, int working_segs);
+static void validate_all_working_segments_engagement(List **allocated_fragments, 
+													 int total_segs, 
+													 int working_segs, 
+													 int input_total_fragments,
+													 int num_hosts_in_cluster);
+static bool is_host_uniq(List** ips_list, char* ip);
+static List* spread_fragments_in_cluster(int number_of_fragments, 
+								  int number_of_hosts, 
+								  int replication_factor, 
+								  char **cluster,
+								  int cluster_size);
+
+/* test input data*/
+typedef struct sTestInputData
+{
+	int m_num_hosts_in_cluster; /* cluster size mustn't exceed 65025 - see function create_cluster() */
+	int m_num_data_fragments; /* number of fragments in the data we intend to allocate between the hawq segments */
+	/* 
+	 * number of datanodes that hold the 'querried' data - there is one datanode 
+	 * on each cluster host - so there are <num_hosts_in_cluster> datanodes 
+	 */
+	int m_num_active_data_nodes; 
+	int m_num_of_fragment_replicas;
+	int m_num_segments_on_host;/* number of Hawq segments on each cluster host - we assume all cluster hosts have Hawq segments installed */
+	/* 
+	 * the subset of Hawq segments that will do the processing  - not all the Hawqs segments 
+	 * in the cluster are involved.
+	 * This parameter plays the role of max_participants_allowed that is passed to map_hddata_2gp_segments()
+	 * in createplan.c
+	 */
+	int m_num_working_segs; 
+	bool m_enable_print_input_cluster;
+	bool m_enable_print_input_fragments;
+	bool m_enable_print_input_segments;
+	bool m_enable_print_allocated_fragments;	
+} TestInputData;
+
+static void test__distribute_work_to_gp_segments(TestInputData *input);
+/*
+ * TRACING CAPABILITIES
+ * The unitest validates the behaviour of the SUT function distribute_work_2_gp_segments() using
+ * the assert_XXX_... functions. But in order to understand the behaviour of the allocation algorithm
+ * it can be helpful to look  at the various data structures involved. For this purpose we have 
+ * several print functions:
+ * a. print_cluster(...)
+ * b. print_fragment_list(...)
+ * c. print_segments_list(...)
+ * d. print_allocated_fragments(...)
+ * All these trace function have the output disabled by default. To enable the output of any print
+ * function set the booleans enable_trace_... for the respective function
+ * test__distribute_work_2_gp_segments()
+ */
+
+
+void
+test__distribute_work_to_gp_segments__big_cluster_few_active_nodes(void **state)
+{
+	TestInputData *input = (TestInputData*)palloc0(sizeof(TestInputData));
+	
+	input->m_num_hosts_in_cluster = 1000; /* cluster size musn't exceed 65025 - see function create_cluster() */
+	input->m_num_data_fragments = 100; /* number of fragments in the data we intend to allocate between the hawq segments */
+	input->m_num_active_data_nodes = 10; /* number of datanodes that hold the 'querried' data - there one datanode om each cluster host - so there are <num_hosts_in_cluster> datanodes */
+	input->m_num_of_fragment_replicas = 3;
+	input->m_num_segments_on_host = 1;/* number of Hawq segments on each cluster host - we assume all cluster hosts have Hawq segments installed */
+	input->m_num_working_segs = 64; /* the subset of Hawq segments that will do the processing  - not all the Hawqs segments in the cluster are involved */
+	input->m_enable_print_input_cluster = false;
+	input->m_enable_print_input_fragments = false;
+	input->m_enable_print_input_segments = false;
+	input->m_enable_print_allocated_fragments = false;
+
+	test__distribute_work_to_gp_segments(input);
+
+	pfree(input);
+}
+
+void
+test__distribute_work_to_gp_segments__big_cluster_many_active_nodes(void **state)
+{
+	TestInputData *input = (TestInputData*)palloc0(sizeof(TestInputData));
+	
+	input->m_num_hosts_in_cluster = 1000; /* cluster size musn't exceed 65025 - see function create_cluster() */
+	input->m_num_data_fragments = 100; /* number of fragments in the data we intend to allocate between the hawq segments */
+	input->m_num_active_data_nodes = 100; /* number of datanodes that hold the 'querried' data - there one datanode om each cluster host - so there are <num_hosts_in_cluster> datanodes */
+	input->m_num_of_fragment_replicas = 3;
+	input->m_num_segments_on_host = 4;/* number of Hawq segments on each cluster host - we assume all cluster hosts have Hawq segments installed */
+	input->m_num_working_segs = 64; /* the subset of Hawq segments that will do the processing  - not all the Hawqs segments in the cluster are involved */
+	input->m_enable_print_input_cluster = false;
+	input->m_enable_print_input_fragments = false;
+	input->m_enable_print_input_segments = false;
+	input->m_enable_print_allocated_fragments = false;
+	
+	test__distribute_work_to_gp_segments(input);
+	pfree(input);
+}
+
+void
+test__distribute_work_to_gp_segments__small_cluster(void **state)
+{
+	TestInputData *input = (TestInputData*)palloc0(sizeof(TestInputData));
+	
+	input->m_num_hosts_in_cluster = 100; /* cluster size musn't exceed 65025 - see function create_cluster() */
+	input->m_num_data_fragments = 100; /* number of fragments in the data we intend to allocate between the hawq segments */
+	input->m_num_active_data_nodes = 50; /* number of datanodes that hold the 'querried' data - there one datanode om each cluster host - so there are <num_hosts_in_cluster> datanodes */
+	input->m_num_of_fragment_replicas = 3;
+	input->m_num_segments_on_host = 4;/* number of Hawq segments on each cluster host - we assume all cluster hosts have Hawq segments installed */
+	input->m_num_working_segs = 64; /* the subset of Hawq segments that will do the processing  - not all the Hawqs segments in the cluster are involved */
+	input->m_enable_print_input_cluster = false;
+	input->m_enable_print_input_fragments = false;
+	input->m_enable_print_input_segments = false;
+	input->m_enable_print_allocated_fragments = false;
+	
+	test__distribute_work_to_gp_segments(input);
+	pfree(input);
+}
+
+void
+test__distribute_work_to_gp_segments__small_cluster_many_active_nodes(void **state)
+{
+	TestInputData *input = (TestInputData*)palloc0(sizeof(TestInputData));
+	
+	input->m_num_hosts_in_cluster = 100; /* cluster size musn't exceed 65025 - see function create_cluster() */
+	input->m_num_data_fragments = 100; /* number of fragments in the data we intend to allocate between the hawq segments */
+	input->m_num_active_data_nodes = 90; /* number of datanodes that hold the 'querried' data - there one datanode om each cluster host - so there are <num_hosts_in_cluster> datanodes */
+	input->m_num_of_fragment_replicas = 3;
+	input->m_num_segments_on_host = 4;/* number of Hawq segments on each cluster host - we assume all cluster hosts have Hawq segments installed */
+	input->m_num_working_segs = 64; /* the subset of Hawq segments that will do the processing  - not all the Hawqs segments in the cluster are involved */
+	input->m_enable_print_input_cluster = false;
+	input->m_enable_print_input_fragments = false;
+	input->m_enable_print_input_segments = false;
+	input->m_enable_print_allocated_fragments = false;
+	
+	test__distribute_work_to_gp_segments(input);
+	pfree(input);
+}
+
+void
+test__distribute_work_to_gp_segments__small_cluster_few_replicas(void **state)
+{
+	TestInputData *input = (TestInputData*)palloc0(sizeof(TestInputData));
+	
+	input->m_num_hosts_in_cluster = 100; /* cluster size musn't exceed 65025 - see function create_cluster() */
+	input->m_num_data_fragments = 100; /* number of fragments in the data we intend to allocate between the hawq segments */
+	input->m_num_active_data_nodes = 90; /* number of datanodes that hold the 'querried' data - there one datanode om each cluster host - so there are <num_hosts_in_cluster> datanodes */
+	input->m_num_of_fragment_replicas = 2;
+	input->m_num_segments_on_host = 4;/* number of Hawq segments on each cluster host - we assume all cluster hosts have Hawq segments installed */
+	input->m_num_working_segs = 64; /* the subset of Hawq segments that will do the processing  - not all the Hawqs segments in the cluster are involved */
+	input->m_enable_print_input_cluster = false;
+	input->m_enable_print_input_fragments = false;
+	input->m_enable_print_input_segments = false;
+	input->m_enable_print_allocated_fragments = false;
+	
+	test__distribute_work_to_gp_segments(input);
+	pfree(input);
+}
+
+/*
+ * Testing distribute_work_2_gp_segments
+ */
+static void test__distribute_work_to_gp_segments(TestInputData *input)
+{
+	List **segs_allocated_data = NULL;
+	List * input_fragments_list = NIL;
+	char** array_of_segs = NULL;
+	bool *array_of_primaries;
+	int total_segs;
+	bool cluster_size_not_exceeded = input->m_num_hosts_in_cluster <=  65025;
+	
+	assert_true(cluster_size_not_exceeded);
+	/*  
+	 * 1. Initialize the test input parameters
+	 * We are testing an N hosts cluster. The size of the cluster is set in this section - section 1. 
+	 * Basic test assumptions:
+	 * a. There is one datanode on each host in the cluster
+	 * b. There are Hawq segments on each host in the cluster.
+	 * c. There is an equal number of Hawq segments on each host - hardcoded in this section
+	 */
+	int num_hosts_in_cluster = input->m_num_hosts_in_cluster; /* cluster size musn't exceed 65025 - see function create_cluster() */
+	int num_data_fragments = input->m_num_data_fragments; /* number of fragments in the data we intend to allocate between the hawq segments */
+	int num_active_data_nodes = input->m_num_active_data_nodes; /* number of datanodes that hold the 'querried' data - there one datanode om each cluster host - so there are <num_hosts_in_cluster> datanodes */
+	int num_of_fragment_replicas = input->m_num_of_fragment_replicas;
+	int num_segments_on_host = input->m_num_segments_on_host;/* number of Hawq segments on each cluster host - we assume all cluster hosts have Hawq segments installed */
+	int num_working_segs = input->m_num_working_segs; /* the subset of Hawq segments that will do the processing  - not all the Hawqs segments in the cluster are involved */
+	bool enable_print_input_cluster = input->m_enable_print_input_cluster;
+	bool enable_print_input_fragments = input->m_enable_print_input_fragments;
+	bool enable_print_input_segments = input->m_enable_print_input_segments;
+	bool enable_print_allocated_fragments = input->m_enable_print_allocated_fragments;
+		
+	/* 2. Create the cluster */
+	char **cluster = create_cluster(num_hosts_in_cluster);
+	
+	if (enable_print_input_cluster)
+		print_cluster(cluster, num_hosts_in_cluster);
+	 	
+	/* 3. Input - data fragments */
+	input_fragments_list = spread_fragments_in_cluster(num_data_fragments, /* number of fragments in the data we are about to allocate */
+													   num_active_data_nodes, /* hosts */
+													   num_of_fragment_replicas, /* replicas */
+													   cluster, /* the whole cluster*/
+													   num_hosts_in_cluster/* the number of hosts in the cluster */);
+	if (enable_print_input_fragments)
+		print_fragment_list(input_fragments_list); 
+	
+	/* 4. Input - hawq segments */
+	total_segs = num_hosts_in_cluster * num_segments_on_host;
+	array_of_segs = create_array_of_segs(cluster, num_hosts_in_cluster, num_segments_on_host);	
+	array_of_primaries = create_array_of_primaries(total_segs);
+		
+	buildCdbComponentDatabases(total_segs, array_of_segs, array_of_primaries);	
+	if (enable_print_input_segments)
+		print_segments_list();
+
+    /* 5. Build QueryResource */
+    buildQueryResource(num_hosts_in_cluster*num_segments_on_host, array_of_segs);
+    will_return(GetActiveQueryResource, resource);
+    will_return(GetActiveQueryResource, resource);
+
+	/* 6. The actual unitest of distribute_work_2_gp_segments() */
+	segs_allocated_data = distribute_work_2_gp_segments(input_fragments_list, total_segs, num_working_segs);
+	if (enable_print_allocated_fragments)
+		print_allocated_fragments(segs_allocated_data, total_segs);
+	
+	/* 7. The validations - verifying that the expected output was obtained */
+	validate_total_fragments_allocated(segs_allocated_data, total_segs, num_data_fragments);
+	validate_max_load_per_segment(segs_allocated_data, total_segs, num_working_segs, num_data_fragments);
+	validate_all_working_segments_engagement(segs_allocated_data, total_segs, num_working_segs, num_data_fragments, num_hosts_in_cluster);
+	
+	/* 8. Cleanup */
+	freeQueryResource();
+	restoreCdbComponentDatabases();
+	clean_cluster(cluster, num_hosts_in_cluster);
+	clean_array_of_segs(array_of_segs, total_segs);
+	clean_allocated_fragments(segs_allocated_data, total_segs);
+	pfree(array_of_primaries);
+}
+
+/* create an array of segments based on the host in the cluster and the number of Hawq segments on host */
+static char** create_array_of_segs(char **cluster, int num_hosts, int num_segments_on_host)
+{
+	int i, j;
+	int total_segs = num_hosts * num_segments_on_host;
+	char **array_of_segs = (char**)palloc0(total_segs * sizeof(char *));
+	
+	for (i = 0; i < num_hosts; i++)
+	{
+		for (j = 0; j < num_segments_on_host; j++)
+		{
+			array_of_segs[i * num_segments_on_host + j] = pstrdup(cluster[i]);
+		}
+	}
+
+	return array_of_segs;
+}
+
+/* clean the array of Hawq segments */
+static void clean_array_of_segs(char **array_of_segs, int total_segments)
+{
+	int i;
+	
+	for (i = 0; i < total_segments; i++)
+		pfree(array_of_segs[i]);
+	pfree(array_of_segs);
+}
+
+static bool* create_array_of_primaries(int total_segments)
+{
+	int i;
+	bool *primaries = (bool*)palloc0(total_segments * sizeof(bool));
+	for (i = 0; i < total_segments; i++)
+		primaries[i] = true;
+		
+	return primaries;
+}
+
+/* gives an ip to each host in a num_hosts size cluster */
+static char** create_cluster(int num_hosts)
+{
+	char** cluster = (char**)palloc0(num_hosts * sizeof(char *));
+	int i;
+	char *prefix = "1.2.%d.%d";
+	int third_octet = 1; /* let's begin at 1 */
+	int fourth_octet = 1;
+	StringInfoData ip;
+	initStringInfo(&ip);
+	
+	for (i = 0; i < num_hosts; i++)
+	{
+		appendStringInfo(&ip, prefix, third_octet, fourth_octet);
+		cluster[i] = pstrdup(ip.data);
+		/* this naming scheme will accomodate a cluster size up to 255x255 = 65025. */
+		fourth_octet++;
+		if (fourth_octet == 256)
+		{
+			fourth_octet = 1;
+			third_octet++;
+		}
+		resetStringInfo(&ip);
+	}
+
+	return  cluster;
+}
+
+/* release memory */
+static char** clean_cluster(char** cluster, int num_hosts)
+{
+	int i;
+	
+	for (i = 0; i < num_hosts; i++)
+	{
+		if (cluster[i])
+			pfree(cluster[i]);
+	}
+	pfree(cluster);
+}
+
+/* show the cluster*/
+static char** print_cluster(char** cluster, int num_hosts)
+{
+	int i;
+	StringInfoData msg;
+	initStringInfo(&msg);
+	
+	appendStringInfo(&msg, "cluster size: %d\n", num_hosts);
+	for (i = 0; i < num_hosts; i++)
+	{
+		if (cluster[i])
+			appendStringInfo(&msg, "cluster #%d:   %s\n", i + 1, cluster[i]);
+		else
+			appendStringInfo(&msg, "cluster naming error \n");
+	}
+	
+	elog(FRAGDEBUG, "%s", msg.data);
+	pfree(msg.data);
+}
+
+/* prints for each segments, the index and the host ip */
+static void print_segments_list()
+{
+	StringInfoData msg;
+	CdbComponentDatabases *test_cdb = GpAliveSegmentsInfo.cdbComponentDatabases;
+	initStringInfo(&msg);
+
+	for (int i = 0; i < test_cdb->total_segment_dbs; ++i)
+	{
+		CdbComponentDatabaseInfo* component = &test_cdb->segment_db_info[i];
+		appendStringInfo(&msg, "\nsegment -- index: %d, ip: %s", component->segindex, component->hostip);
+	}
+	
+	elog(FRAGDEBUG, "%s", msg.data);
+	pfree(msg.data);
+}
+
+/* returns the ip  of the segment's host */
+static char* find_segment_ip_by_index(int seg_index)
+{	
+	CdbComponentDatabases *test_cdb = GpAliveSegmentsInfo.cdbComponentDatabases;
+	if (seg_index < 0 || seg_index >= test_cdb->total_segment_dbs)
+		assert_true(false);
+		
+	for (int i = 0; i < test_cdb->total_segment_dbs; ++i)
+	{
+		CdbComponentDatabaseInfo* seg = &test_cdb->segment_db_info[i];
+		if (seg->segindex == seg_index)
+			return seg->hostip;
+	}
+	
+	/* we assert if an index outside the boundaries was supplied */
+	assert_true(false);
+	return NULL;
+}
+
+/* 
+ * print the allocated fragments list 
+ * allocated_fragments is an array of lists. The size of the array is total_segs.
+ * The list located at index i in the array , holds the fragments that will be processed
+ * by Hawq segment i
+ */
+static void print_allocated_fragments(List **allocated_fragments, int total_segs)
+{
+	StringInfoData msg;
+	initStringInfo(&msg);
+	appendStringInfo(&msg, "ALLOCATED FRAGMENTS FOR EACH SEGMENT:\n");
+
+	for (int i = 0; i < total_segs; i++)
+	{
+		if (allocated_fragments[i])
+		{
+			ListCell *frags_cell = NULL;
+			foreach(frags_cell, allocated_fragments[i])
+			{
+				AllocatedDataFragment *frag = (AllocatedDataFragment*)lfirst(frags_cell);
+				appendStringInfo(&msg, "%s\n", print_one_allocated_data_fragment(frag, i));
+			}
+		}
+	}
+		
+	elog(FRAGDEBUG, "%s", msg.data);
+	if (msg.data)
+		pfree(msg.data);
+}
+
+/* print one allocated fragment */
+static char* print_one_allocated_data_fragment(AllocatedDataFragment *frag, int seg_index)
+{
+	StringInfoData msg;
+	initStringInfo(&msg);
+	char* seg_ip = find_segment_ip_by_index(seg_index);
+	if (!seg_ip)
+		seg_ip = "INVALID SEGMENT INDEX";
+	bool locality = (strcmp(frag->host, seg_ip) == 0) ? true : false;
+	
+	appendStringInfo(&msg, 
+	                 "locality: %d, segment number: %d , segment ip: %s --- fragment index: %d, datanode host: %s, file: %s", 
+	                 locality, seg_index, seg_ip, frag->index, frag->host, frag->source_name);
+
+	return msg.data;
+}
+
+/* release memory of allocated_fragments */
+void clean_allocated_fragments(List **allocated_fragments, int total_segs)
+{
+	for (int i = 0; i < total_segs; i++)
+		if (allocated_fragments[i])
+			free_allocated_frags(allocated_fragments[i]);
+	pfree(allocated_fragments);
+}
+
+/* calculate the optimal load distribution per segment */
+static int calc_load_per_segment(int input_total_fragments, int working_segs)
+{
+	return (input_total_fragments % working_segs) ? input_total_fragments / working_segs + 1: 
+	input_total_fragments / working_segs;
+}
+
+/* 
+ * test that a host is uniq.
+ * the functions ensures that ip names are unique by managing a set of ips
+ * the set is implemented with a linked list
+ */
+static bool is_host_uniq(List** ips_list, char* ip)
+{
+	ListCell* cell;
+	foreach(cell, *ips_list)
+	{
+		char* foundip = (char*)lfirst(cell);
+		if (strcmp(foundip, ip) == 0)
+			return false;
+	}
+	
+	lappend(*ips_list, ip);
+	return true;
+}
+
+/* validate that all input blocks were allocated */
+static void validate_total_fragments_allocated(List **allocated_fragments, int total_segs, int input_total_fragments)
+{
+	int total_fragments_allocated = 0; 
+
+	for (int i = 0; i < total_segs; i++)
+	{
+		if (allocated_fragments[i])
+			total_fragments_allocated += list_length(allocated_fragments[i]);
+	}
+
+	assert_int_equal(total_fragments_allocated, input_total_fragments);
+}
+
+/* validate that the load per segment does not exceed the expected load */
+static void validate_max_load_per_segment(List **allocated_fragments, int total_segs, int working_segs, int input_total_fragments)
+{
+	int max_load = 0;
+	int load_per_segment =  calc_load_per_segment(input_total_fragments, working_segs);
+	
+	for (int i = 0; i < total_segs; i++)
+	{
+		if (allocated_fragments[i] && list_length(allocated_fragments[i]) > max_load)
+			max_load = list_length(allocated_fragments[i]);
+	}
+	
+	bool load_per_segment_not_exceeded = load_per_segment >=  max_load;
+	elog(FRAGDEBUG, "actual max_load: %d, expected load_per_segment: %d", max_load, load_per_segment);
+	assert_true(load_per_segment_not_exceeded);
+}
+
+/* 
+ * we validate that every working segment is engaged, by verifying that for the case when 
+ * the load_per_segment is greater than one, then every working_segment has allocated fragments,
+ * and for the case when load_per_segment is 1, then the number of segments that got work
+ * equals the number of fragments
+ */
+static void validate_all_working_segments_engagement(List **allocated_fragments, 
+													 int total_segs, 
+													 int working_segs, 
+													 int input_total_fragments,
+													 int num_hosts_in_cluster)
+{
+	List* ips_list = NIL;
+	ListCell* cell;
+	int total_segs_engaged = 0;
+	int load_per_segment =  calc_load_per_segment(input_total_fragments, working_segs);
+	bool require_full_distribution = num_hosts_in_cluster >= working_segs;
+	
+	for (int i = 0; i < total_segs; i++)
+		if (allocated_fragments[i] && list_length(allocated_fragments[i]) > 0)
+		{
+			char *ip;
+			bool isuniq;
+			total_segs_engaged++;
+			if (require_full_distribution)
+			{
+				ip = find_segment_ip_by_index(i);
+				isuniq = is_host_uniq(&ips_list, ip);
+				assert_true(isuniq);
+			}
+		}
+	
+	if (load_per_segment == 1)
+		assert_int_equal(total_segs_engaged, input_total_fragments);
+	else
+	{
+		bool total_segs_engaged_not_exceeded = total_segs_engaged <= working_segs;
+		assert_true(total_segs_engaged_not_exceeded);
+	}
+	
+	/* clean memory */
+	foreach(cell, ips_list)
+		pfree(lfirst(cell));
+	list_free(ips_list);
+}
+
+/*
+ * Creates a list of DataFragment for one file ("file.txt").
+ * The important thing here is the fragments' location. It is deteremined by the parameters:
+ * replication_factor - number of copies of each fragment on the different hosts.
+ * number_of_hosts - number of hosts
+ * number_of_fragments - number of fragments in the file.
+ * cluster - holds the ips of all hosts in the cluster
+ *
+ * Each fragment will have <replication_factor> hosts from the cluster
+ */
+static List* 
+spread_fragments_in_cluster(int number_of_fragments, 
+								  int number_of_hosts, 
+								  int replication_factor, 
+								  char **cluster, 
+								  int cluster_size)
+{
+	int first_host, target_host;
+	List* fragments_list = NIL;
+	StringInfoData string_info;
+	initStringInfo(&string_info);
+
+	/* pick the first host in the cluster that will host the data. The fragments will be spread from this host onward */
+	first_host = 0;
+	
+	target_host = first_host;
+	
+	for (int i = 0; i < number_of_fragments; ++i)
+	{
+		DataFragment* fragment = (DataFragment*) palloc0(sizeof(DataFragment));
+		
+		fragment->index = i;
+		fragment->source_name = pstrdup("file.txt");
+		
+		for (int j = 0; j < replication_factor; ++j)
+		{
+			FragmentHost* fhost = (FragmentHost*)palloc0(sizeof(FragmentHost));
+			appendStringInfo(&string_info, cluster[target_host]);
+			fhost->ip = pstrdup(string_info.data);
+			resetStringInfo(&string_info);
+			fragment->replicas = lappend(fragment->replicas, fhost);
+			
+			target_host = ((j + i + first_host) % number_of_hosts);
+		}
+		assert_int_equal(list_length(fragment->replicas), replication_factor);
+		appendStringInfo(&string_info, "metadata %d", i);
+		fragment->fragment_md = pstrdup(string_info.data);
+		resetStringInfo(&string_info);
+		appendStringInfo(&string_info, "user data %d", i);
+		fragment->user_data = pstrdup(string_info.data);
+		resetStringInfo(&string_info);
+		fragments_list = lappend(fragments_list, fragment);
+	}
+
+	pfree(string_info.data);
+	return fragments_list;
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/7c2f615d/src/backend/access/external/test/hd_work_mgr_do_segment_clustering_by_host_test.c
----------------------------------------------------------------------
diff --git a/src/backend/access/external/test/hd_work_mgr_do_segment_clustering_by_host_test.c b/src/backend/access/external/test/hd_work_mgr_do_segment_clustering_by_host_test.c
new file mode 100644
index 0000000..3d3dbe5
--- /dev/null
+++ b/src/backend/access/external/test/hd_work_mgr_do_segment_clustering_by_host_test.c
@@ -0,0 +1,99 @@
+#include <stdarg.h>
+#include <stddef.h>
+#include <setjmp.h>
+#include "cmockery.h"
+
+#include "c.h"
+#include "hd_work_mgr_mock.h"
+
+
+/*
+ * check element list_index in segmenet_list
+ * has the expected hostip.
+ */
+void check_segment_info(List* segment_list, int list_index,
+						const char* expected_hostip)
+{
+
+	CdbComponentDatabaseInfo* seg_info =
+			(CdbComponentDatabaseInfo*)lfirst(list_nth_cell(segment_list, list_index));
+	assert_string_equal(seg_info->hostip, expected_hostip);
+}
+
+/*
+ * Test clustering of segments to hosts.
+ * Environment: 10 segments over 3 hosts, all primary.
+ */
+void 
+test__do_segment_clustering_by_host__10SegmentsOn3Hosts(void **state)
+{
+	List* groups = NIL;
+	ListCell* cell = NULL;
+	GpHost* gphost = NULL;
+	List* segs = NIL;
+	CdbComponentDatabaseInfo* seg_info = NULL;
+
+	char* array_of_segs[10] =
+		{"1.2.3.1", "1.2.3.1", "1.2.3.1", "1.2.3.1",
+		 "1.2.3.2", "1.2.3.2", "1.2.3.2",
+		 "1.2.3.3", "1.2.3.3", "1.2.3.3"
+	};
+	bool array_of_primaries[10] =
+	{
+		true, true, true, true,
+		true, true, true,
+		true, true, true
+	};
+	int number_of_segments = 10;
+	/* sanity */
+	assert_true(number_of_segments == (sizeof(array_of_segs) / sizeof(array_of_segs[0])));
+	assert_true(number_of_segments == (sizeof(array_of_primaries) / sizeof(array_of_primaries[0])));
+
+	buildCdbComponentDatabases(number_of_segments, array_of_segs, array_of_primaries);
+
+	CdbComponentDatabases *cdb = GpAliveSegmentsInfo.cdbComponentDatabases;
+
+	/* sanity for cdbComponentDatabases building*/
+	assert_int_equal(cdb->total_segment_dbs, number_of_segments);
+	assert_string_equal(cdb->segment_db_info[4].hostip, array_of_segs[4]);
+
+	/* build QueryResource */
+	buildQueryResource(10, array_of_segs);
+	will_return(GetActiveQueryResource, resource);
+
+	/* test do_segment_clustering_by_host */
+	groups = do_segment_clustering_by_host();
+
+	assert_int_equal(list_length(groups), 3);
+
+	cell = list_nth_cell(groups, 0);
+	gphost = (GpHost*)lfirst(cell);
+	assert_string_equal(gphost->ip, array_of_segs[0]);
+	assert_int_equal(list_length(gphost->segs), 4);
+    for (int i = 0; i < 4; ++i)
+	{
+		check_segment_info(gphost->segs, i, "1.2.3.1");
+	}
+
+	cell = list_nth_cell(groups, 1);
+	gphost = (GpHost*)lfirst(cell);
+	assert_string_equal(gphost->ip, "1.2.3.2");
+	assert_int_equal(list_length(gphost->segs), 3);
+	for (int i = 0; i < 3; ++i)
+	{
+		check_segment_info(gphost->segs, i, "1.2.3.2");
+	}
+
+	cell = list_nth_cell(groups, 2);
+	gphost = (GpHost*)lfirst(cell);
+	assert_string_equal(gphost->ip, "1.2.3.3");
+	assert_int_equal(list_length(gphost->segs), 3);
+	for (int i = 0; i < 3; ++i)
+	{
+		check_segment_info(gphost->segs, i, "1.2.3.3");
+	}
+
+	freeQueryResource();
+	restoreCdbComponentDatabases();
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/7c2f615d/src/backend/access/external/test/hd_work_mgr_mock.c
----------------------------------------------------------------------
diff --git a/src/backend/access/external/test/hd_work_mgr_mock.c b/src/backend/access/external/test/hd_work_mgr_mock.c
new file mode 100644
index 0000000..a03313c
--- /dev/null
+++ b/src/backend/access/external/test/hd_work_mgr_mock.c
@@ -0,0 +1,100 @@
+#include <stdarg.h>
+#include <stddef.h>
+#include <setjmp.h>
+#include "cmockery.h"
+
+#include "c.h"
+#include "hd_work_mgr_mock.h"
+
+/*
+ * Helper functions to create and restore GpAliveSegmentsInfo.cdbComponentDatabases element
+ * used by hd_work_mgr
+ */
+
+/*
+ * Builds an array of CdbComponentDatabaseInfo.
+ * Each segment is assigned a sequence number and an ip.
+ * segs_num - the number of segments
+ * segs_hostips - array of the ip of each segment
+ * primaries_map - array of which segments are primaries
+ */
+void buildCdbComponentDatabases(int segs_num,
+								char* segs_hostips[],
+								bool primaries_map[])
+{
+	CdbComponentDatabases *test_cdb = palloc0(sizeof(CdbComponentDatabases));
+	CdbComponentDatabaseInfo* component = NULL;
+	test_cdb->total_segment_dbs = segs_num;
+	test_cdb->segment_db_info =
+			(CdbComponentDatabaseInfo *) palloc0(sizeof(CdbComponentDatabaseInfo) * test_cdb->total_segment_dbs);
+
+	for (int i = 0; i < test_cdb->total_segment_dbs; ++i)
+	{
+		component = &test_cdb->segment_db_info[i];
+		component->segindex = i;
+		component->role = primaries_map[i] ? SEGMENT_ROLE_PRIMARY : SEGMENT_ROLE_MIRROR;
+		component->hostip = pstrdup(segs_hostips[i]);
+	}
+
+	orig_cdb = GpAliveSegmentsInfo.cdbComponentDatabases;
+	orig_seg_count = GpAliveSegmentsInfo.aliveSegmentsCount;
+	GpAliveSegmentsInfo.cdbComponentDatabases = test_cdb;
+	GpAliveSegmentsInfo.aliveSegmentsCount = segs_num;
+}
+
+void restoreCdbComponentDatabases()
+{
+	/* free test CdbComponentDatabases */
+	if (GpAliveSegmentsInfo.cdbComponentDatabases)
+		freeCdbComponentDatabases(GpAliveSegmentsInfo.cdbComponentDatabases);
+
+	GpAliveSegmentsInfo.cdbComponentDatabases = orig_cdb;
+	GpAliveSegmentsInfo.aliveSegmentsCount = orig_seg_count;
+}
+
+/* Builds the QueryResource for a query */
+void buildQueryResource(int segs_num,
+                        char * segs_hostips[])
+{
+	resource = (QueryResource *)palloc(sizeof(QueryResource));
+
+	resource->segments = NULL;
+	for (int i = 0; i < segs_num; ++i)
+	{
+		Segment *segment = (Segment *)palloc0(sizeof(Segment));
+		segment->hostip = pstrdup(segs_hostips[i]);
+		segment->segindex = i;
+		segment->alive = true;
+
+		resource->segments = lappend(resource->segments, segment);
+	}
+
+	return resource;
+}
+
+/* Restores the QueryResource for a query */
+void freeQueryResource()
+{
+	if (resource)
+	{
+		ListCell *cell;
+
+		if (resource->segments)
+		{
+			cell = list_head(resource->segments);
+			while(cell != NULL)
+			{
+				ListCell *tmp = cell;
+				cell = lnext(cell);
+
+				pfree(((Segment *)lfirst(tmp))->hostip);
+				pfree(lfirst(tmp));
+				pfree(tmp);
+			}
+		}
+
+		pfree(resource);
+		resource = NULL;
+	}
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/7c2f615d/src/backend/access/external/test/hd_work_mgr_mock.h
----------------------------------------------------------------------
diff --git a/src/backend/access/external/test/hd_work_mgr_mock.h b/src/backend/access/external/test/hd_work_mgr_mock.h
new file mode 100644
index 0000000..2eda33e
--- /dev/null
+++ b/src/backend/access/external/test/hd_work_mgr_mock.h
@@ -0,0 +1,84 @@
+#ifndef HD_WORK_MGR_MOCK_
+#define HD_WORK_MGR_MOCK_
+
+#include <stdarg.h>
+#include <stddef.h>
+#include <setjmp.h>
+#include "cmockery.h"
+
+#include "c.h"
+#include "../hd_work_mgr.c"
+
+static CdbComponentDatabases *orig_cdb = NULL;
+static int orig_seg_count = -1;
+
+static QueryResource * resource = NULL;
+
+struct AliveSegmentsInfo
+{
+	uint64			fts_statusVersion;
+	TransactionId	tid;
+
+	/* Release gangs */
+	bool			cleanGangs;
+
+	/* Used for debug & test */
+	bool			forceUpdate;
+	int				failed_segmentid_start;
+	int				failed_segmentid_number;
+
+	/* Used for disptacher. */
+	int4			aliveSegmentsCount;
+	int4			singleton_segindex;
+	Bitmapset		*aliveSegmentsBitmap;
+	struct CdbComponentDatabases *cdbComponentDatabases;
+};
+
+typedef struct AliveSegmentsInfo AliveSegmentsInfo;
+
+AliveSegmentsInfo GpAliveSegmentsInfo = {0, 0, false, false, 0, 0, UNINITIALIZED_GP_IDENTITY_VALUE, 0, NULL, NULL};
+
+/*
+ * Helper functions copied from backend/cdb/cdbutils.c
+ */
+
+/*
+ * _freeCdbComponentDatabases
+ *
+ * Releases the storage occupied by the CdbComponentDatabases
+ * struct pointed to by the argument.
+ */
+void
+_freeCdbComponentDatabases(CdbComponentDatabases *pDBs);
+
+/*
+ * _freeCdbComponentDatabaseInfo:
+ * Releases any storage allocated for members variables of a CdbComponentDatabaseInfo struct.
+ */
+void
+_freeCdbComponentDatabaseInfo(CdbComponentDatabaseInfo *cdi);
+
+/*
+ * Helper functions to create and restore GpAliveSegmentsInfo.cdbComponentDatabases element
+ * used by hd_work_mgr
+ */
+
+/*
+ * Builds an array of CdbComponentDatabaseInfo.
+ * Each segment is assigned a sequence number and an ip.
+ * segs_num - the number of segments
+ * segs_hostips - array of the ip of each segment
+ * primaries_map - array of which segments are primaries
+ */
+void buildCdbComponentDatabases(int segs_num,
+								char* segs_hostips[],
+								bool primaries_map[]);
+
+
+void restoreCdbComponentDatabases();
+
+void buildQueryResource(int segs_num,
+                        char * segs_hostips[]);
+void freeQueryResource();
+                                  
+#endif //HD_WORK_MGR_MOCK_

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/7c2f615d/src/backend/access/external/test/hd_work_mgr_test.c
----------------------------------------------------------------------
diff --git a/src/backend/access/external/test/hd_work_mgr_test.c b/src/backend/access/external/test/hd_work_mgr_test.c
new file mode 100644
index 0000000..382927b
--- /dev/null
+++ b/src/backend/access/external/test/hd_work_mgr_test.c
@@ -0,0 +1,33 @@
+#include <stdarg.h>
+#include <stddef.h>
+#include <setjmp.h>
+#include "cmockery.h"
+
+#include "c.h"
+#include "hd_work_mgr_mock.c"
+#include "hd_work_mgr_do_segment_clustering_by_host_test.c"
+#include "hd_work_mgr_allocate_fragments_to_datanodes_test.c"
+#include "hd_work_mgr_distribute_work_2_gp_segments_test.c"
+
+int 
+main(int argc, char* argv[]) 
+{
+	cmockery_parse_arguments(argc, argv);
+
+	const UnitTest tests[] = {
+			unit_test(test__do_segment_clustering_by_host__10SegmentsOn3Hosts),
+			unit_test(test__get_dn_processing_load),
+			unit_test(test__create_allocated_fragment__NoUserData),
+			unit_test(test__create_allocated_fragment__WithUserData),
+			unit_test(test__allocate_fragments_to_datanodes__4Fragments10Hosts3Replicates),
+			unit_test(test__allocate_fragments_to_datanodes__4Fragments3Hosts2Replicates),
+			unit_test(test__allocate_fragments_to_datanodes__4Fragments3Hosts1Replicates),
+			unit_test(test__allocate_fragments_to_datanodes__7Fragments10Hosts1Replicates),
+			unit_test(test__distribute_work_to_gp_segments__big_cluster_few_active_nodes),	
+			unit_test(test__distribute_work_to_gp_segments__big_cluster_many_active_nodes),
+			unit_test(test__distribute_work_to_gp_segments__small_cluster),
+			unit_test(test__distribute_work_to_gp_segments__small_cluster_many_active_nodes),
+			unit_test(test__distribute_work_to_gp_segments__small_cluster_few_replicas)
+	};
+	return run_tests(tests);
+}


Mime
View raw message