Return-Path: X-Original-To: apmail-hawq-commits-archive@minotaur.apache.org Delivered-To: apmail-hawq-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id EAA52173D5 for ; Mon, 2 Nov 2015 06:14:10 +0000 (UTC) Received: (qmail 57386 invoked by uid 500); 2 Nov 2015 06:14:10 -0000 Delivered-To: apmail-hawq-commits-archive@hawq.apache.org Received: (qmail 57342 invoked by uid 500); 2 Nov 2015 06:14:10 -0000 Mailing-List: contact commits-help@hawq.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@hawq.incubator.apache.org Delivered-To: mailing list commits@hawq.incubator.apache.org Received: (qmail 57333 invoked by uid 99); 2 Nov 2015 06:14:10 -0000 Received: from Unknown (HELO spamd3-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 02 Nov 2015 06:14:10 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd3-us-west.apache.org (ASF Mail Server at spamd3-us-west.apache.org) with ESMTP id 4261F1803F0 for ; Mon, 2 Nov 2015 06:14:10 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd3-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 1.771 X-Spam-Level: * X-Spam-Status: No, score=1.771 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, KAM_LAZY_DOMAIN_SECURITY=1, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, T_RP_MATCHES_RCVD=-0.01, URIBL_BLOCKED=0.001] autolearn=disabled Received: from mx1-us-west.apache.org ([10.40.0.8]) by localhost (spamd3-us-west.apache.org [10.40.0.10]) (amavisd-new, port 10024) with ESMTP id vO2Z54pvsQp8 for ; Mon, 2 Nov 2015 06:13:58 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx1-us-west.apache.org (ASF Mail Server at mx1-us-west.apache.org) with SMTP id 2257B2165B for ; Mon, 2 Nov 2015 06:13:58 +0000 (UTC) Received: (qmail 57242 invoked by uid 99); 2 Nov 2015 06:13:58 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 02 Nov 2015 06:13:58 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id D0EDDDFE7C; Mon, 2 Nov 2015 06:13:57 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: huor@apache.org To: commits@hawq.incubator.apache.org Date: Mon, 02 Nov 2015 06:13:59 -0000 Message-Id: <2e645c4094fe4170856ced7db8e25041@git.apache.org> In-Reply-To: <7e3639dfb5264b8b8c3625b358226e2f@git.apache.org> References: <7e3639dfb5264b8b8c3625b358226e2f@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [3/3] incubator-hawq git commit: HAWQ-86. Fix and re-enable unit test for pxf and external storage HAWQ-86. Fix and re-enable unit test for pxf and external storage Project: http://git-wip-us.apache.org/repos/asf/incubator-hawq/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-hawq/commit/7c2f615d Tree: http://git-wip-us.apache.org/repos/asf/incubator-hawq/tree/7c2f615d Diff: http://git-wip-us.apache.org/repos/asf/incubator-hawq/diff/7c2f615d Branch: refs/heads/master Commit: 7c2f615d207f342218b5c77832875cbb03301306 Parents: 6d743f1 Author: Ruilong Huo Authored: Mon Nov 2 14:13:00 2015 +0800 Committer: Ruilong Huo Committed: Mon Nov 2 14:13:00 2015 +0800 ---------------------------------------------------------------------- src/backend/access/external/test/.gitignore | 1 + src/backend/access/external/test/Makefile | 66 ++ src/backend/access/external/test/README.txt | 6 + .../access/external/test/ha_config_mock.c | 20 + .../access/external/test/ha_config_test.c | 419 +++++++++++++ ...k_mgr_allocate_fragments_to_datanodes_test.c | 597 ++++++++++++++++++ ...ork_mgr_distribute_work_2_gp_segments_test.c | 623 +++++++++++++++++++ ...ork_mgr_do_segment_clustering_by_host_test.c | 99 +++ .../access/external/test/hd_work_mgr_mock.c | 100 +++ .../access/external/test/hd_work_mgr_mock.h | 84 +++ .../access/external/test/hd_work_mgr_test.c | 33 + .../access/external/test/pxffilters_test.c | 556 +++++++++++++++++ .../access/external/test/pxfheaders_test.c | 223 +++++++ .../access/external/test/pxfmasterapi_test.c | 231 +++++++ .../access/external/test/pxfuriparser_test.c | 352 +++++++++++ .../access/external/test_discard/.gitignore | 1 - .../access/external/test_discard/Makefile | 66 -- .../access/external/test_discard/README.txt | 6 - .../external/test_discard/ha_config_mock.c | 20 - .../external/test_discard/ha_config_test.c | 419 ------------- ...k_mgr_allocate_fragments_to_datanodes_test.c | 597 ------------------ ...ork_mgr_distribute_work_2_gp_segments_test.c | 621 ------------------ ...ork_mgr_do_segment_clustering_by_host_test.c | 175 ------ .../external/test_discard/hd_work_mgr_mock.c | 53 -- .../external/test_discard/hd_work_mgr_mock.h | 54 -- .../external/test_discard/hd_work_mgr_test.c | 34 - .../external/test_discard/pxffilters_test.c | 556 ----------------- .../external/test_discard/pxfheaders_test.c | 223 ------- .../external/test_discard/pxfmasterapi_test.c | 231 ------- .../external/test_discard/pxfuriparser_test.c | 352 ----------- 30 files changed, 3410 insertions(+), 3408 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/7c2f615d/src/backend/access/external/test/.gitignore ---------------------------------------------------------------------- diff --git a/src/backend/access/external/test/.gitignore b/src/backend/access/external/test/.gitignore new file mode 100644 index 0000000..a8d6b6c --- /dev/null +++ b/src/backend/access/external/test/.gitignore @@ -0,0 +1 @@ +*.t http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/7c2f615d/src/backend/access/external/test/Makefile ---------------------------------------------------------------------- diff --git a/src/backend/access/external/test/Makefile b/src/backend/access/external/test/Makefile new file mode 100644 index 0000000..e13d107 --- /dev/null +++ b/src/backend/access/external/test/Makefile @@ -0,0 +1,66 @@ +subdir=src/backend/access/external +top_builddir=../../../../.. + +TARGETS=pxfuriparser hd_work_mgr pxfheaders ha_config pxffilters pxfmasterapi + +# Objects from backend, which don't need to be mocked but need to be linked. +COMMON_REAL_OBJS=\ + $(top_srcdir)/src/backend/access/hash/hashfunc.o \ + $(top_srcdir)/src/backend/bootstrap/bootparse.o \ + $(top_srcdir)/src/backend/lib/stringinfo.o \ + $(top_srcdir)/src/backend/nodes/bitmapset.o \ + $(top_srcdir)/src/backend/nodes/equalfuncs.o \ + $(top_srcdir)/src/backend/nodes/list.o \ + $(top_srcdir)/src/backend/parser/gram.o \ + $(top_srcdir)/src/backend/regex/regcomp.o \ + $(top_srcdir)/src/backend/regex/regerror.o \ + $(top_srcdir)/src/backend/regex/regexec.o \ + $(top_srcdir)/src/backend/regex/regfree.o \ + $(top_srcdir)/src/backend/storage/page/itemptr.o \ + $(top_srcdir)/src/backend/utils/adt/datum.o \ + $(top_srcdir)/src/backend/utils/adt/like.o \ + $(top_srcdir)/src/backend/utils/error/elog.o \ + $(top_srcdir)/src/backend/utils/hash/dynahash.o \ + $(top_srcdir)/src/backend/utils/hash/hashfn.o \ + $(top_srcdir)/src/backend/utils/init/globals.o \ + $(top_srcdir)/src/backend/utils/mb/mbutils.o \ + $(top_srcdir)/src/backend/utils/mb/wchar.o \ + $(top_srcdir)/src/backend/utils/misc/guc.o \ + $(top_srcdir)/src/port/exec.o \ + $(top_srcdir)/src/port/path.o \ + $(top_srcdir)/src/port/pgsleep.o \ + $(top_srcdir)/src/port/pgstrcasecmp.o \ + $(top_srcdir)/src/port/qsort.o \ + $(top_srcdir)/src/port/strlcpy.o \ + $(top_srcdir)/src/port/thread.o \ + $(top_srcdir)/src/timezone/localtime.o \ + $(top_srcdir)/src/timezone/pgtz.o \ + $(top_srcdir)/src/timezone/strftime.o \ + $(top_srcdir)/src/backend/utils/mmgr/redzone_handler.o \ + +# Objects from backend, which don't need to be mocked but need to be linked. +pxfuriparser_REAL_OBJS=$(COMMON_REAL_OBJS) \ + $(top_srcdir)/src/backend/utils/adt/formatting.o \ + $(top_srcdir)/src/backend/nodes/value.o \ + $(top_srcdir)/src/backend/utils/adt/numutils.o \ + $(top_srcdir)/src/backend/access/external/pxfutils.o +hd_work_mgr_REAL_OBJS=$(COMMON_REAL_OBJS) \ + $(top_srcdir)/src/backend/cdb/cdbutil.o \ + $(top_srcdir)/src/backend/access/external/pxfuriparser.o \ + $(top_srcdir)/src/backend/access/external/pxfutils.o + +# numutils for pg_ltoa +pxfheaders_REAL_OBJS=$(COMMON_REAL_OBJS) \ + $(top_srcdir)/src/backend/utils/adt/numutils.o +ha_config_REAL_OBJS=$(COMMON_REAL_OBJS) \ + $(top_srcdir)/src/backend/utils/fmgrtab.o \ + $(top_srcdir)/src/backend/utils/adt/numutils.o +pxfmasterapi_REAL_OBJS=$(COMMON_REAL_OBJS) \ + $(top_srcdir)/src/backend/utils/fmgrtab.o +pxffilters_REAL_OBJS=$(COMMON_REAL_OBJS) \ + $(top_srcdir)/src/backend/optimizer/util/clauses.o \ + $(top_srcdir)/src/backend/parser/parse_expr.o + +include ../../../../Makefile.mock + +MOCK_LIBS += -ljson http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/7c2f615d/src/backend/access/external/test/README.txt ---------------------------------------------------------------------- diff --git a/src/backend/access/external/test/README.txt b/src/backend/access/external/test/README.txt new file mode 100644 index 0000000..75a03f0 --- /dev/null +++ b/src/backend/access/external/test/README.txt @@ -0,0 +1,6 @@ +Directory with the following System Under Test (SUT): + - pxfuriparser.c + - hd_work_mgr.c + - pxfheaders.c + - ha_config.c + - pxffilters.c http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/7c2f615d/src/backend/access/external/test/ha_config_mock.c ---------------------------------------------------------------------- diff --git a/src/backend/access/external/test/ha_config_mock.c b/src/backend/access/external/test/ha_config_mock.c new file mode 100644 index 0000000..dcf2b12 --- /dev/null +++ b/src/backend/access/external/test/ha_config_mock.c @@ -0,0 +1,20 @@ +#include +#include +#include +#include "cmockery.h" + +#include "c.h" +#include "hdfs/hdfs.h" + +/* Mock functions for the hdfs.h APIs used by load_nn_ha_config() in ha_config.c */ + +Namenode * hdfsGetHANamenodes(const char *nameservice, int *size) +{ + optional_assignment(size); + return (Namenode *)mock(); +} + +void hdfsFreeNamenodeInformation(Namenode *namenodes, int size) +{ + mock(); +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/7c2f615d/src/backend/access/external/test/ha_config_test.c ---------------------------------------------------------------------- diff --git a/src/backend/access/external/test/ha_config_test.c b/src/backend/access/external/test/ha_config_test.c new file mode 100644 index 0000000..61cf1de --- /dev/null +++ b/src/backend/access/external/test/ha_config_test.c @@ -0,0 +1,419 @@ +#include +#include +#include +#include "cmockery.h" + +#include "c.h" +#include "../ha_config.c" +#include "ha_config_mock.c" +#include "lib/stringinfo.h" +#include "utils/builtins.h" + +/* Helper functions signature */ +#define format_string_release(p) (pfree(p)) + +static char *format_string_create(char *f,...); +static void handle_pg_excp(char *msg, int errcode); + +/* + * Unitest for GPHD_HA_load_nodes() in ../access/external/ha_config.c + * GPHD_HA_load_nodes() discovers the active Namnode from an HA Namenodes pair. + * It does this by interacting with the API exposed by hdfs.h, from which it uses + * 2 functions: + * a. Namenode * hdfsGetHANamenodes(const char * nameservice, int * size); + * b. void hdfsFreeNamenodeInformation(Namenode * namenodes, int size); + * This unitest verifies the correct interaction between GPHD_HA_load_nodes() implementation + * and the 2 hdfs.h APIs. It looks at the standard flows with expected input configuration + * and also at limit cases with corrupted input configuration. + * The mock functions for the two(2) hdfs.h APIs are in ha_config_mock.c. + */ + +/* + * SUT function: NNHAConf* GPHD_HA_load_nodes(const char *nameservice); + * Mock function: Namenode * hdfsGetHANamenodes(const char * nameservice, int * size) + * Negative test: GPHD_HA_load_nodes() receives an unexistent namesrvice + */ +void +test__GPHD_HA_load_nodes__UnknownNameservice(void **state) +{ + /* + * In case it receives an unknown nameservice string, the real function hdfsGetHANamenodes() + * will return NULL. We instruct our mock function to return NULL. In this way we simulate + * an unknown_service scenario and verify that our SUT function GPHD_HA_load_nodes() handles + * correctly the NULL returned by hdfsGetHANamenodes. + */ + will_return(hdfsGetHANamenodes, NULL); + + PG_TRY(); + { + NNHAConf *hac = GPHD_HA_load_nodes("UNKNOWN_SERVICE"); + } + PG_CATCH(); + { + char *msg = "nameservice UNKNOWN_SERVICE not found in client configuration. No HA namenodes provided"; + handle_pg_excp(msg, ERRCODE_SYNTAX_ERROR); + return; + } + PG_END_TRY(); + + assert_true(false); +} + +/* + * SUT function: NNHAConf* GPHD_HA_load_nodes(const char *nameservice); + * Mock function: Namenode * hdfsGetHANamenodes(const char * nameservice, int * size) + * Negative test: hdfsGetHANamenodes() returns just one namenode. + * This is not an HA sustainable. + */ +void +test__GPHD_HA_load_nodes__OneNN(void **state) +{ + unsigned int numn = 1; + Namenode nns[1]; + + will_return(hdfsGetHANamenodes, nns); + will_assign_value(hdfsGetHANamenodes, size, numn); + + + PG_TRY(); + { + NNHAConf *hac = GPHD_HA_load_nodes("NAMESERVICE"); + } + PG_CATCH(); + { + char *msg = format_string_create("High availability for nameservice %s was configured with only one node. A high availability scheme requires at least two nodes ", + "NAMESERVICE"); + handle_pg_excp(msg, ERRCODE_INTERNAL_ERROR); + format_string_release(msg); /* if we trip on assert_string_equal we don't free but it doesen't matter because process stops*/ + return; + } + PG_END_TRY(); + + assert_true(false); + +} + +/* + * SUT function: NNHAConf* GPHD_HA_load_nodes(const char *nameservice); + * Mock function: Namenode * hdfsGetHANamenodes(const char * nameservice, int * size) + * Negative test: hdfsGetHANamenodes() returns a Namenode.rpc_address field without ":" + * the host:port delimiter. + */ +void +test__GPHD_HA_load_nodes__RpcDelimMissing(void **state) +{ + unsigned int numn = 2; + Namenode nns[] = { {"mdw2080", "mdw:50070"}, {"smdw:2080", "smdw:50070"}}; + + will_return(hdfsGetHANamenodes, nns); + will_assign_value(hdfsGetHANamenodes, size, numn); + + PG_TRY(); + { + NNHAConf *hac = GPHD_HA_load_nodes("NAMESERVICE"); + } + PG_CATCH(); + { + char *msg = "dfs.namenode.rpc-address was set incorrectly in the configuration. ':' missing"; + handle_pg_excp(msg, ERRCODE_SYNTAX_ERROR); + return; + } + PG_END_TRY(); + + assert_true(false); + +} + +/* + * SUT function: NNHAConf* GPHD_HA_load_nodes(const char *nameservice) + * Mock functions: Namenode * hdfsGetHANamenodes(const char * nameservice, int * size) + * void port_to_str(char **port, int new_port) + * Positive test: port_to_str() assigns pxf_service_port correctly + */ +void +test__GPHD_HA_load_nodes__PxfServicePortIsAssigned(void **state) +{ + unsigned int numn = 2; + Namenode nns[] = { {"mdw:2080", "mdw:50070"}, {"smdw:2080", "smdw:50070"}}; + char strPort[30] = {0}; + pg_ltoa(pxf_service_port, strPort); + + will_return(hdfsGetHANamenodes, nns); + will_assign_value(hdfsGetHANamenodes, size, numn); + + will_be_called(port_to_str); + expect_not_value(port_to_str, port, NULL); + expect_value(port_to_str, new_port, pxf_service_port); + will_assign_string(port_to_str, port, strPort); + + will_be_called(port_to_str); + expect_not_value(port_to_str, port, NULL); + expect_value(port_to_str, new_port, pxf_service_port); + will_assign_string(port_to_str, port, strPort); + + will_be_called(hdfsFreeNamenodeInformation); + + + NNHAConf *hac = GPHD_HA_load_nodes("NAMESERVICE"); +} + +/* + * SUT function: NNHAConf* GPHD_HA_load_nodes(const char *nameservice) + * Mock functions: Namenode * hdfsGetHANamenodes(const char * nameservice, int * size) + * void port_to_str(char **port, int new_port) + * Negative test: hdfsGetHANamenodes() returns a Namenode.http_address field without + * the host - ":port". + */ +void +test__GPHD_HA_load_nodes__HostMissing(void **state) +{ + unsigned int numn = 2; + Namenode nns[] = { {":2080", "mdw:50070"}, {"smdw:2080", "smdw:50070"}}; + char strPort[30] = {0}; + pg_ltoa(pxf_service_port, strPort); + + will_return(hdfsGetHANamenodes, nns); + will_assign_value(hdfsGetHANamenodes, size, numn); + + will_be_called(port_to_str); + expect_not_value(port_to_str, port, NULL); + expect_value(port_to_str, new_port, pxf_service_port); + will_assign_string(port_to_str, port, strPort); + + will_be_called(port_to_str); + expect_not_value(port_to_str, port, NULL); + expect_value(port_to_str, new_port, pxf_service_port); + will_assign_string(port_to_str, port, strPort); + + will_be_called(hdfsFreeNamenodeInformation); + + PG_TRY(); + { + NNHAConf *hac = GPHD_HA_load_nodes("NAMESERVICE"); + } + PG_CATCH(); + { + char *msg = "HA Namenode host number 1 is NULL value"; + handle_pg_excp(msg, ERRCODE_SYNTAX_ERROR); + return; + } + PG_END_TRY(); +} + +/* + * SUT function: NNHAConf* GPHD_HA_load_nodes(const char *nameservice) + * Mock functions: Namenode * hdfsGetHANamenodes(const char * nameservice, int * size) + * void port_to_str(char **port, int new_port) + * Negative test: port_to_str() does not set the port + * the port - "host:". + */ +void +test__GPHD_HA_load_nodes__PortMissing(void **state) +{ + unsigned int numn = 2; + Namenode nns[] = { {"mdw:", "mdw:50070"}, {"smdw:2080", "smdw:50070"}}; + + will_return(hdfsGetHANamenodes, nns); + will_assign_value(hdfsGetHANamenodes, size, numn); + + will_be_called(port_to_str); + expect_not_value(port_to_str, port, NULL); + expect_value(port_to_str, new_port, pxf_service_port); + + will_be_called(port_to_str); + expect_not_value(port_to_str, port, NULL); + expect_value(port_to_str, new_port, pxf_service_port); + + will_be_called(hdfsFreeNamenodeInformation); + + PG_TRY(); + { + NNHAConf *hac = GPHD_HA_load_nodes("NAMESERVICE"); + } + PG_CATCH(); + { + char *msg = "HA Namenode RPC port number 1 is NULL value"; + handle_pg_excp(msg, ERRCODE_SYNTAX_ERROR); + return; + } + PG_END_TRY(); +} + +/* + * SUT function: NNHAConf* GPHD_HA_load_nodes(const char *nameservice) + * Mock functions: Namenode * hdfsGetHANamenodes(const char * nameservice, int * size) + * void port_to_str(char **port, int new_port) + * Negative test: port_to_str() returns a port outside the valid range + * - a number higher than 65535 + */ +void +test__GPHD_HA_load_nodes__PortIsInvalidNumber(void **state) +{ + unsigned int numn = 2; + Namenode nns[] = { {"mdw:2080", "mdw:65550"}, {"smdw:2080", "smdw:50070"}}; + + will_return(hdfsGetHANamenodes, nns); + will_assign_value(hdfsGetHANamenodes, size, numn); + + will_be_called(port_to_str); + expect_not_value(port_to_str, port, NULL); + expect_value(port_to_str, new_port, pxf_service_port); + will_assign_string(port_to_str, port, "65550"); + + will_be_called(port_to_str); + expect_not_value(port_to_str, port, NULL); + expect_value(port_to_str, new_port, pxf_service_port); + will_assign_string(port_to_str, port, "65550"); + + will_be_called(hdfsFreeNamenodeInformation); + + PG_TRY(); + { + NNHAConf *hac = GPHD_HA_load_nodes("NAMESERVICE"); + } + PG_CATCH(); + { + char *msg = "Invalid port <65550> detected in nameservice configuration"; + handle_pg_excp(msg, ERRCODE_SYNTAX_ERROR); + return; + } + PG_END_TRY(); +} + +/* + * SUT function: NNHAConf* GPHD_HA_load_nodes(const char *nameservice) + * Mock functions: Namenode * hdfsGetHANamenodes(const char * nameservice, int * size) + * void port_to_str(char **port, int new_port) + * Negative test: port_to_str() returns a port that is not a number + */ +void +test__GPHD_HA_load_nodes__PortIsNotNumber_TakeOne(void **state) +{ + NNHAConf *hac; + unsigned int numn = 2; + Namenode nns[] = { {"mdw:2080", "mdw:50070"}, {"smdw:2080", "smdw:50070"}}; + + will_return(hdfsGetHANamenodes, nns); + will_assign_value(hdfsGetHANamenodes, size, numn); + + will_be_called(port_to_str); + expect_not_value(port_to_str, port, NULL); + expect_value(port_to_str, new_port, pxf_service_port); + will_assign_string(port_to_str, port, "melon"); + + will_be_called(port_to_str); + expect_not_value(port_to_str, port, NULL); + expect_value(port_to_str, new_port, pxf_service_port); + + will_be_called(hdfsFreeNamenodeInformation); + + PG_TRY(); + { + hac = GPHD_HA_load_nodes("NAMESERVICE"); + } + PG_CATCH(); + { + char *msg = "Invalid port detected in nameservice configuration"; + handle_pg_excp(msg, ERRCODE_SYNTAX_ERROR); + return; + } + PG_END_TRY(); +} + +/* + * SUT function: NNHAConf* GPHD_HA_load_nodes(const char *nameservice) + * Mock functions: Namenode * hdfsGetHANamenodes(const char * nameservice, int * size) + * void port_to_str(char **port, int new_port) + * Negative test: port_to_str() returns a port that is not a number + */ +void +test__GPHD_HA_load_nodes__PortIsNotNumber_TakeTwo(void **state) +{ + NNHAConf *hac; + unsigned int numn = 2; + Namenode nns[] = { {"mdw:2080", "mdw:50070"}, {"smdw:2080", "smdw:50070"}}; + + will_return(hdfsGetHANamenodes, nns); + will_assign_value(hdfsGetHANamenodes, size, numn); + + will_be_called(port_to_str); + expect_not_value(port_to_str, port, NULL); + expect_value(port_to_str, new_port, pxf_service_port); + will_assign_string(port_to_str, port, "100ab"); + + will_be_called(port_to_str); + expect_not_value(port_to_str, port, NULL); + expect_value(port_to_str, new_port, pxf_service_port); + + will_be_called(hdfsFreeNamenodeInformation); + + PG_TRY(); + { + hac = GPHD_HA_load_nodes("NAMESERVICE"); + } + PG_CATCH(); + { + char *msg = "Invalid port <100ab> detected in nameservice configuration"; + handle_pg_excp(msg, ERRCODE_SYNTAX_ERROR); + return; + } + PG_END_TRY(); +} + +int +main(int argc, char *argv[]) +{ + cmockery_parse_arguments(argc, argv); + + const UnitTest tests[] = { + unit_test(test__GPHD_HA_load_nodes__UnknownNameservice), + unit_test(test__GPHD_HA_load_nodes__OneNN), + unit_test(test__GPHD_HA_load_nodes__RpcDelimMissing), + unit_test(test__GPHD_HA_load_nodes__PxfServicePortIsAssigned), + unit_test(test__GPHD_HA_load_nodes__HostMissing), + unit_test(test__GPHD_HA_load_nodes__PortMissing), + unit_test(test__GPHD_HA_load_nodes__PortIsInvalidNumber), + unit_test(test__GPHD_HA_load_nodes__PortIsNotNumber_TakeOne), + unit_test(test__GPHD_HA_load_nodes__PortIsNotNumber_TakeTwo) + }; + return run_tests(tests); +} + +/* + * Helper function to format strings that need to be passed to assert macros + */ +static char* +format_string_create(char *f,...) +{ + StringInfoData s; + va_list vl; + + initStringInfo(&s); + + va_start(vl,f); + appendStringInfoVA(&s, f, vl); + va_end(vl); + + return s.data; +} + +/* + * Encapsulates exception unpackaging + */ +static void +handle_pg_excp(char *msg, int errcode) +{ + CurrentMemoryContext = 1; + ErrorData *edata = CopyErrorData(); + + assert_true(edata->sqlerrcode == errcode); + assert_true(edata->elevel == ERROR); + assert_string_equal(edata->message, msg); + + /* Clean the internal error data stack. Otherwise errordata_stack_depth in elog.c, + * keeps growing from test to test with each ereport we issue in our SUT function + * until we reach errordata_stack_depth >= ERRORDATA_STACK_SIZE and our tests + * start failing + */ + elog_dismiss(INFO); +} http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/7c2f615d/src/backend/access/external/test/hd_work_mgr_allocate_fragments_to_datanodes_test.c ---------------------------------------------------------------------- diff --git a/src/backend/access/external/test/hd_work_mgr_allocate_fragments_to_datanodes_test.c b/src/backend/access/external/test/hd_work_mgr_allocate_fragments_to_datanodes_test.c new file mode 100644 index 0000000..fb42ee1 --- /dev/null +++ b/src/backend/access/external/test/hd_work_mgr_allocate_fragments_to_datanodes_test.c @@ -0,0 +1,597 @@ +#include +#include +#include +#include "cmockery.h" + +#include "c.h" +#include "hd_work_mgr_mock.h" + + +/* + * check that datanode dn has the expected + * ip, port, dn blocks list, and number of read & residing fragments + */ +void check_dn_load(DatanodeProcessingLoad* dn, + const char* expected_hostip, + int expected_port, + List* expected_blocks, + int expected_read_fragments, + int expected_residing_fragments) +{ + + ListCell* cell = NULL; + int blocks_number = 0; + + assert_string_equal(dn->dataNodeIp, expected_hostip); + assert_int_equal(dn->port, expected_port); + assert_int_equal(dn->num_fragments_read, expected_read_fragments); + assert_int_equal(dn->num_fragments_residing, expected_residing_fragments); + + assert_int_equal(list_length(expected_blocks), list_length(dn->datanodeBlocks)); + blocks_number = list_length(expected_blocks); + for (int i = 0; i < blocks_number; ++i) + { + AllocatedDataFragment* block = + (AllocatedDataFragment*)lfirst(list_nth_cell(dn->datanodeBlocks, i)); + AllocatedDataFragment* expected_block = + (AllocatedDataFragment*)lfirst(list_nth_cell(expected_blocks, i)); + assert_string_equal(block->host, expected_block->host); + assert_int_equal(block->rest_port, expected_block->rest_port); + assert_string_equal(block->source_name, expected_block->source_name); + assert_string_equal(block->fragment_md, expected_block->fragment_md); + assert_string_equal(block->user_data, expected_block->user_data); + assert_int_equal(block->index, expected_block->index); + } + +} + +/* + * Test get_dn_processing_load: + * starting with an empty list, add fragment location + * and check that the returned element equals the given fragment + * and that if the location is new a new element was added to the list. + */ +void +test__get_dn_processing_load(void **state) +{ + List* allDNProcessingLoads = NIL; + ListCell* cell = NULL; + FragmentHost *fragment_loc = NULL; + DatanodeProcessingLoad* dn_found = NULL; + + char* ip_array[] = + { + "1.2.3.1", + "1.2.3.2", + "1.2.3.3", + "1.2.3.1", + "1.2.3.1", + "1.2.3.3", + "1.2.3.1" + }; + int port_array[] = + { + 100, + 100, + 100, + 100, + 100, + 100, + 200 + }; + int expected_list_size[] = + { + 1, 2, 3, 3, 3, 3, 4 + }; + int array_size = 7; + /* sanity */ + assert_int_equal(array_size, (sizeof(ip_array) / sizeof(ip_array[0]))); + assert_int_equal(array_size, (sizeof(port_array) / sizeof(port_array[0]))); + assert_int_equal(array_size, (sizeof(expected_list_size) / sizeof(expected_list_size[0]))); + + fragment_loc = (FragmentHost*) palloc0(sizeof(FragmentHost)); + + for (int i = 0; i < array_size; ++i) + { + dn_found = NULL; + fragment_loc->ip = ip_array[i]; + fragment_loc->rest_port = port_array[i]; + + dn_found = get_dn_processing_load(&allDNProcessingLoads, fragment_loc); + + assert_true(dn_found != NULL); + assert_int_equal(list_length(allDNProcessingLoads), expected_list_size[i]); + + check_dn_load(dn_found, ip_array[i], port_array[i], NIL, 0, 0); + } + + pfree(fragment_loc); + /* free allDNProcessingLoads */ + foreach(cell, allDNProcessingLoads) + { + DatanodeProcessingLoad* cell_data = (DatanodeProcessingLoad*)lfirst(cell); + pfree(cell_data->dataNodeIp); + } + list_free(allDNProcessingLoads); +} + +/* + * create, verify and free AllocatedDataFragment + */ +void check_create_allocated_fragment(DataFragment* fragment, bool has_user_data) +{ + AllocatedDataFragment* allocated = create_allocated_fragment(fragment); + + assert_true(allocated != NULL); + assert_int_equal(allocated->index, fragment->index); + assert_string_equal(allocated->source_name, fragment->source_name); + assert_string_equal(allocated->fragment_md, fragment->fragment_md); + if (has_user_data) + assert_string_equal(allocated->user_data, fragment->user_data); + else + assert_true(allocated->user_data == NULL); + assert_true(allocated->host == NULL); + assert_int_equal(allocated->rest_port, 0); + + if (allocated->source_name) + pfree(allocated->source_name); + if (allocated->user_data) + pfree(allocated->user_data); + pfree(allocated); +} + + +/* + * Test create_allocated_fragment without user_data. + */ +void +test__create_allocated_fragment__NoUserData(void **state) +{ + AllocatedDataFragment* allocated = NULL; + + DataFragment* fragment = (DataFragment*) palloc0(sizeof(DataFragment)); + fragment->index = 13; + fragment->source_name = "source name!!!"; + fragment->user_data = NULL; + fragment->fragment_md = "METADATA"; + + check_create_allocated_fragment(fragment, false); + + pfree(fragment); +} + +/* + * Test create_allocated_fragment without user_data. + */ +void +test__create_allocated_fragment__WithUserData(void **state) +{ + AllocatedDataFragment* allocated = NULL; + + DataFragment* fragment = (DataFragment*) palloc0(sizeof(DataFragment)); + fragment->index = 13; + fragment->source_name = "source name!!!"; + fragment->user_data = "Wish You Were Here"; + fragment->fragment_md = "METADATA"; + + check_create_allocated_fragment(fragment, true); + + pfree(fragment); +} + +/* + * Creates a list of DataFragment for one file ("file.txt"). + * The important thing here is the fragments' location. It is determined by the parameters: + * replication_factor - number of copies of each fragment on the different hosts. + * number_of_hosts - number of hosts, so that the IP pool we use is 1.2.3.{1-number_of_hosts} + * number_of_fragments - number of fragments in the file. + * + * Each fragment will have hosts, + * starting from IP 1.2.3. to IP 1.2.3. modulo . + * That way there is some overlapping between the hosts of each fragment. + */ +List* build_data_fragments_list(int number_of_fragments, int number_of_hosts, int replication_factor) +{ + List* fragments_list = NIL; + StringInfoData string_info; + initStringInfo(&string_info); + + for (int i = 0; i < number_of_fragments; ++i) + { + DataFragment* fragment = (DataFragment*) palloc0(sizeof(DataFragment)); + + fragment->index = i; + fragment->source_name = pstrdup("file.txt"); + + for (int j = 0; j < replication_factor; ++j) + { + FragmentHost* fhost = (FragmentHost*)palloc0(sizeof(FragmentHost)); + appendStringInfo(&string_info, "1.2.3.%d", ((j + i) % number_of_hosts) + 1); + fhost->ip = pstrdup(string_info.data); + resetStringInfo(&string_info); + fragment->replicas = lappend(fragment->replicas, fhost); + } + assert_int_equal(list_length(fragment->replicas), replication_factor); + appendStringInfo(&string_info, "metadata %d", i); + fragment->fragment_md = pstrdup(string_info.data); + resetStringInfo(&string_info); + appendStringInfo(&string_info, "user data %d", i); + fragment->user_data = pstrdup(string_info.data); + resetStringInfo(&string_info); + fragments_list = lappend(fragments_list, fragment); + } + + pfree(string_info.data); + return fragments_list; +} + +/* + * Returns a list of AllocatedDataFragment. + */ +List* build_blank_blocks_list(int list_size) +{ + List* blocks_list = NIL; + for (int i = 0; i < list_size; ++i) + { + AllocatedDataFragment* block = (AllocatedDataFragment*) palloc0(sizeof(AllocatedDataFragment)); + blocks_list = lappend(blocks_list, block); + } + return blocks_list; +} + +/* + * Tests allocate_fragments_to_datanodes with 4 fragments over 10 hosts, with 3 replicates each, + * so that they are distributed as follows: + * Fragment 0 : on hosts 1.2.3.1, 1.2.3.2, 1.2.3.3. + * Fragment 1 : on hosts 1.2.3.2, 1.2.3.3, 1.2.3.4. + * Fragment 2 : on hosts 1.2.3.3, 1.2.3.4, 1.2.3.5. + * Fragment 3 : on hosts 1.2.3.4, 1.2.3.5, 1.2.3.6. + * + * The expected distribution is as follows: + * Host 1.2.3.1: fragment 0. + * Host 1.2.3.2: fragment 1. + * Host 1.2.3.3: fragment 2. + * Host 1.2.3.4: fragment 3. + * Hosts 1.2.3.5, 1.2.3.6: no fragments. + */ +void +test__allocate_fragments_to_datanodes__4Fragments10Hosts3Replicates(void **state) +{ + List* allDNProcessingLoads = NIL; + List* data_fragment_list = NIL; + List* blocks_list = NIL; + ListCell* cell = NULL; + DatanodeProcessingLoad* datanode_load = NULL; + AllocatedDataFragment* allocated = NULL; + + data_fragment_list = build_data_fragments_list(4, 10, 3); + assert_int_equal(4, list_length(data_fragment_list)); + + allDNProcessingLoads = allocate_fragments_to_datanodes(data_fragment_list); + + assert_true(allDNProcessingLoads != NULL); + assert_int_equal(6, list_length(allDNProcessingLoads)); + + datanode_load = (DatanodeProcessingLoad*)lfirst(list_nth_cell(allDNProcessingLoads, 0)); + blocks_list = build_blank_blocks_list(1); + allocated = lfirst(list_nth_cell(blocks_list, 0)); + allocated->host = "1.2.3.1"; + allocated->index = 0; + allocated->rest_port = 0; + allocated->source_name = "file.txt"; + allocated->fragment_md = "metadata 0"; + allocated->user_data = "user data 0"; + check_dn_load(datanode_load, + "1.2.3.1", 0, blocks_list, 1, 1); + + datanode_load = (DatanodeProcessingLoad*)lfirst(list_nth_cell(allDNProcessingLoads, 1)); + allocated->host = "1.2.3.2"; + allocated->index = 1; + allocated->fragment_md = "metadata 1"; + allocated->user_data = "user data 1"; + check_dn_load(datanode_load, + "1.2.3.2", 0, blocks_list, 1, 2); + + datanode_load = (DatanodeProcessingLoad*)lfirst(list_nth_cell(allDNProcessingLoads, 2)); + allocated->host = "1.2.3.3"; + allocated->index = 2; + allocated->fragment_md = "metadata 2"; + allocated->user_data = "user data 2"; + check_dn_load(datanode_load, + "1.2.3.3", 0, blocks_list, 1, 3); + + datanode_load = (DatanodeProcessingLoad*)lfirst(list_nth_cell(allDNProcessingLoads, 3)); + allocated->host = "1.2.3.4"; + allocated->index = 3; + allocated->fragment_md = "metadata 3"; + allocated->user_data = "user data 3"; + check_dn_load(datanode_load, + "1.2.3.4", 0, blocks_list, 1, 3); + + datanode_load = (DatanodeProcessingLoad*)lfirst(list_nth_cell(allDNProcessingLoads, 4)); + check_dn_load(datanode_load, + "1.2.3.5", 0, NIL, 0, 2); + datanode_load = (DatanodeProcessingLoad*)lfirst(list_nth_cell(allDNProcessingLoads, 5)); + check_dn_load(datanode_load, + "1.2.3.6", 0, NIL, 0, 1); + + /* cleanup */ + list_free(blocks_list); + foreach (cell, allDNProcessingLoads) + { + DatanodeProcessingLoad* dn = (DatanodeProcessingLoad*)lfirst(cell); + free_allocated_frags(dn->datanodeBlocks); + } + list_free(allDNProcessingLoads); +} + +/* + * Tests allocate_fragments_to_datanodes with 4 fragments over 3 hosts, with 2 replicates each, + * so that they are distributed as follows: + * Fragment 0 : on hosts 1.2.3.1, 1.2.3.2. + * Fragment 1 : on hosts 1.2.3.2, 1.2.3.3. + * Fragment 2 : on hosts 1.2.3.3, 1.2.3.1. + * Fragment 3 : on hosts 1.2.3.1, 1.2.3.2. + * + * The expected distribution is as follows: + * Host 1.2.3.1: fragments 0, 3. + * Host 1.2.3.2: fragment 1. + * Host 1.2.3.3: fragment 2. + */ +void +test__allocate_fragments_to_datanodes__4Fragments3Hosts2Replicates(void **state) +{ + List* allDNProcessingLoads = NIL; + List* data_fragment_list = NIL; + List* blocks_list = NIL; + ListCell* cell = NULL; + DatanodeProcessingLoad* datanode_load = NULL; + AllocatedDataFragment* allocated = NULL; + + data_fragment_list = build_data_fragments_list(4, 3, 2); + assert_int_equal(4, list_length(data_fragment_list)); + + allDNProcessingLoads = allocate_fragments_to_datanodes(data_fragment_list); + + assert_true(allDNProcessingLoads != NULL); + assert_int_equal(3, list_length(allDNProcessingLoads)); + + datanode_load = (DatanodeProcessingLoad*)lfirst(list_nth_cell(allDNProcessingLoads, 0)); + blocks_list = build_blank_blocks_list(2); + allocated = lfirst(list_nth_cell(blocks_list, 0)); + allocated->host = "1.2.3.1"; + allocated->index = 0; + allocated->rest_port = 0; + allocated->source_name = "file.txt"; + allocated->fragment_md = "metadata 0"; + allocated->user_data = "user data 0"; + allocated = lfirst(list_nth_cell(blocks_list, 1)); + allocated->host = "1.2.3.1"; + allocated->index = 3; + allocated->rest_port = 0; + allocated->source_name = "file.txt"; + allocated->fragment_md = "metadata 3"; + allocated->user_data = "user data 3"; + check_dn_load(datanode_load, + "1.2.3.1", 0, blocks_list, 2, 3); + list_free(blocks_list); + + datanode_load = (DatanodeProcessingLoad*)lfirst(list_nth_cell(allDNProcessingLoads, 1)); + blocks_list = build_blank_blocks_list(1); + allocated = lfirst(list_nth_cell(blocks_list, 0)); + allocated->host = "1.2.3.2"; + allocated->index = 1; + allocated->rest_port = 0; + allocated->source_name = "file.txt"; + allocated->fragment_md = "metadata 1"; + allocated->user_data = "user data 1"; + check_dn_load(datanode_load, + "1.2.3.2", 0, blocks_list, 1, 3); + + datanode_load = (DatanodeProcessingLoad*)lfirst(list_nth_cell(allDNProcessingLoads, 2)); + allocated->host = "1.2.3.3"; + allocated->index = 2; + allocated->fragment_md = "metadata 2"; + allocated->user_data = "user data 2"; + check_dn_load(datanode_load, + "1.2.3.3", 0, blocks_list, 1, 2); + + /* cleanup */ + list_free(blocks_list); + foreach (cell, allDNProcessingLoads) + { + DatanodeProcessingLoad* dn = (DatanodeProcessingLoad*)lfirst(cell); + free_allocated_frags(dn->datanodeBlocks); + } + list_free(allDNProcessingLoads); +} + +/* + * Tests allocate_fragments_to_datanodes with 4 fragments over 3 hosts, with 1 replicates each, + * so that they are distributed as follows: + * Fragment 0 : on hosts 1.2.3.1. + * Fragment 1 : on hosts 1.2.3.2. + * Fragment 2 : on hosts 1.2.3.3. + * Fragment 3 : on hosts 1.2.3.1. + * + * The expected distribution is as follows: + * Host 1.2.3.1: fragments 0, 3. + * Host 1.2.3.2: fragment 1. + * Host 1.2.3.3: fragment 2. + */ +void +test__allocate_fragments_to_datanodes__4Fragments3Hosts1Replicates(void **state) +{ + List* allDNProcessingLoads = NIL; + List* data_fragment_list = NIL; + List* blocks_list = NIL; + ListCell* cell = NULL; + DatanodeProcessingLoad* datanode_load = NULL; + AllocatedDataFragment* allocated = NULL; + + data_fragment_list = build_data_fragments_list(4, 3, 1); + assert_int_equal(4, list_length(data_fragment_list)); + + allDNProcessingLoads = allocate_fragments_to_datanodes(data_fragment_list); + + assert_true(allDNProcessingLoads != NULL); + assert_int_equal(3, list_length(allDNProcessingLoads)); + + datanode_load = (DatanodeProcessingLoad*)lfirst(list_nth_cell(allDNProcessingLoads, 0)); + blocks_list = build_blank_blocks_list(2); + allocated = lfirst(list_nth_cell(blocks_list, 0)); + allocated->host = "1.2.3.1"; + allocated->index = 0; + allocated->rest_port = 0; + allocated->source_name = "file.txt"; + allocated->fragment_md = "metadata 0"; + allocated->user_data = "user data 0"; + allocated = lfirst(list_nth_cell(blocks_list, 1)); + allocated->host = "1.2.3.1"; + allocated->index = 3; + allocated->rest_port = 0; + allocated->source_name = "file.txt"; + allocated->fragment_md = "metadata 3"; + allocated->user_data = "user data 3"; + check_dn_load(datanode_load, + "1.2.3.1", 0, blocks_list, 2, 2); + list_free(blocks_list); + + datanode_load = (DatanodeProcessingLoad*)lfirst(list_nth_cell(allDNProcessingLoads, 1)); + blocks_list = build_blank_blocks_list(1); + allocated = lfirst(list_nth_cell(blocks_list, 0)); + allocated->host = "1.2.3.2"; + allocated->index = 1; + allocated->rest_port = 0; + allocated->source_name = "file.txt"; + allocated->fragment_md = "metadata 1"; + allocated->user_data = "user data 1"; + check_dn_load(datanode_load, + "1.2.3.2", 0, blocks_list, 1, 1); + + datanode_load = (DatanodeProcessingLoad*)lfirst(list_nth_cell(allDNProcessingLoads, 2)); + allocated->host = "1.2.3.3"; + allocated->index = 2; + allocated->fragment_md = "metadata 2"; + allocated->user_data = "user data 2"; + check_dn_load(datanode_load, + "1.2.3.3", 0, blocks_list, 1, 1); + + /* cleanup */ + list_free(blocks_list); + foreach (cell, allDNProcessingLoads) + { + DatanodeProcessingLoad* dn = (DatanodeProcessingLoad*)lfirst(cell); + free_allocated_frags(dn->datanodeBlocks); + } + list_free(allDNProcessingLoads); +} + +/* + * Tests allocate_fragments_to_datanodes with 7 fragments over 10 hosts, with 1 replicates each, + * so that they are distributed as follows: + * Fragment 0 : on hosts 1.2.3.1. + * Fragment 1 : on hosts 1.2.3.2. + * Fragment 2 : on hosts 1.2.3.3. + * Fragment 3 : on hosts 1.2.3.4. + * Fragment 4 : on hosts 1.2.3.5. + * Fragment 5 : on hosts 1.2.3.6. + * Fragment 6 : on hosts 1.2.3.7. + * + * The expected distribution is as follows: + * Host 1.2.3.1: fragment 0. + * Host 1.2.3.2: fragment 1. + * Host 1.2.3.3: fragment 2. + * Host 1.2.3.4: fragment 3. + * Host 1.2.3.5: fragment 4. + * Host 1.2.3.6: fragment 5. + * Host 1.2.3.7: fragment 6. + * Hosts 1.2.3.8, 1.2.3.9, 1.2.3.10: no fragments. + */ +void +test__allocate_fragments_to_datanodes__7Fragments10Hosts1Replicates(void **state) +{ + List* allDNProcessingLoads = NIL; + List* data_fragment_list = NIL; + List* blocks_list = NIL; + ListCell* cell = NULL; + DatanodeProcessingLoad* datanode_load = NULL; + AllocatedDataFragment* allocated = NULL; + + data_fragment_list = build_data_fragments_list(7, 10, 1); + assert_int_equal(7, list_length(data_fragment_list)); + + allDNProcessingLoads = allocate_fragments_to_datanodes(data_fragment_list); + + assert_true(allDNProcessingLoads != NULL); + assert_int_equal(7, list_length(allDNProcessingLoads)); + + datanode_load = (DatanodeProcessingLoad*)lfirst(list_nth_cell(allDNProcessingLoads, 0)); + blocks_list = build_blank_blocks_list(1); + allocated = lfirst(list_nth_cell(blocks_list, 0)); + allocated->host = "1.2.3.1"; + allocated->index = 0; + allocated->rest_port = 0; + allocated->source_name = "file.txt"; + allocated->fragment_md = "metadata 0"; + allocated->user_data = "user data 0"; + check_dn_load(datanode_load, + "1.2.3.1", 0, blocks_list, 1, 1); + + datanode_load = (DatanodeProcessingLoad*)lfirst(list_nth_cell(allDNProcessingLoads, 1)); + allocated->host = "1.2.3.2"; + allocated->index = 1; + allocated->fragment_md = "metadata 1"; + allocated->user_data = "user data 1"; + check_dn_load(datanode_load, + "1.2.3.2", 0, blocks_list, 1, 1); + + datanode_load = (DatanodeProcessingLoad*)lfirst(list_nth_cell(allDNProcessingLoads, 2)); + allocated->host = "1.2.3.3"; + allocated->index = 2; + allocated->fragment_md = "metadata 2"; + allocated->user_data = "user data 2"; + check_dn_load(datanode_load, + "1.2.3.3", 0, blocks_list, 1, 1); + + datanode_load = (DatanodeProcessingLoad*)lfirst(list_nth_cell(allDNProcessingLoads, 3)); + allocated->host = "1.2.3.4"; + allocated->index = 3; + allocated->fragment_md = "metadata 3"; + allocated->user_data = "user data 3"; + check_dn_load(datanode_load, + "1.2.3.4", 0, blocks_list, 1, 1); + + datanode_load = (DatanodeProcessingLoad*)lfirst(list_nth_cell(allDNProcessingLoads, 4)); + allocated->host = "1.2.3.5"; + allocated->index = 4; + allocated->fragment_md = "metadata 4"; + allocated->user_data = "user data 4"; + check_dn_load(datanode_load, + "1.2.3.5", 0, blocks_list, 1, 1); + + datanode_load = (DatanodeProcessingLoad*)lfirst(list_nth_cell(allDNProcessingLoads, 5)); + allocated->host = "1.2.3.6"; + allocated->index = 5; + allocated->fragment_md = "metadata 5"; + allocated->user_data = "user data 5"; + check_dn_load(datanode_load, + "1.2.3.6", 0, blocks_list, 1, 1); + + datanode_load = (DatanodeProcessingLoad*)lfirst(list_nth_cell(allDNProcessingLoads, 6)); + allocated->host = "1.2.3.7"; + allocated->index = 6; + allocated->fragment_md = "metadata 6"; + allocated->user_data = "user data 6"; + check_dn_load(datanode_load, + "1.2.3.7", 0, blocks_list, 1, 1); + + /* cleanup */ + list_free(blocks_list); + foreach (cell, allDNProcessingLoads) + { + DatanodeProcessingLoad* dn = (DatanodeProcessingLoad*)lfirst(cell); + free_allocated_frags(dn->datanodeBlocks); + } + list_free(allDNProcessingLoads); +} http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/7c2f615d/src/backend/access/external/test/hd_work_mgr_distribute_work_2_gp_segments_test.c ---------------------------------------------------------------------- diff --git a/src/backend/access/external/test/hd_work_mgr_distribute_work_2_gp_segments_test.c b/src/backend/access/external/test/hd_work_mgr_distribute_work_2_gp_segments_test.c new file mode 100644 index 0000000..2b940ff --- /dev/null +++ b/src/backend/access/external/test/hd_work_mgr_distribute_work_2_gp_segments_test.c @@ -0,0 +1,623 @@ +#include +#include +#include +#include "cmockery.h" + +#include "c.h" +#include "hd_work_mgr_mock.h" + + +/* + * In test__distribute_work_2_gp_segments we are testing the function distribute_work_2_gp_segments(). + * distribute_work_2_gp_segments() implements the algorithm that allocates the fragments of an external + * data source to the Hawq segments for processing. + * This unitest verifies the algorithm output, and ensures the following algorithm behaviour: + * a. The number of fragments allocated is equal to the input number of fragments + * b. distribution of work between segments: if there are two segments required and more than + * one host, each segment will be on a different host + * c. percent of local datanodes out of all datanodes + * d. percent of local fragments out of all fragments + * e. number of actual working segments is bigger than half of the initial working segments + */ + +static void print_allocated_fragments(List **allocated_fragments, int total_segs); +static char* print_one_allocated_data_fragment(AllocatedDataFragment *frag, int seg_index); +static char* find_segment_ip_by_index(int seg_index); +static char** create_cluster(int num_hosts); +static char** clean_cluster(char** cluster, int num_hosts); +static char** create_array_of_segs(char **cluster, int num_hosts, int num_segments_on_host); +static void clean_array_of_segs(char **array_of_segs, int number_of_segments); +static bool* create_array_of_primaries(int number_of_segments); +static char** print_cluster(char** cluster, int num_hosts); +static void print_segments_list(); +void clean_allocated_fragments(List **allocated_fragments, int total_segs); +static void validate_total_fragments_allocated(List **allocated_fragments, int total_segs, int input_total_fragments); +static void validate_max_load_per_segment(List **allocated_fragments, int total_segs, int working_segs, int input_total_fragments); +static int calc_load_per_segment(int input_total_fragments, int working_segs); +static void validate_all_working_segments_engagement(List **allocated_fragments, + int total_segs, + int working_segs, + int input_total_fragments, + int num_hosts_in_cluster); +static bool is_host_uniq(List** ips_list, char* ip); +static List* spread_fragments_in_cluster(int number_of_fragments, + int number_of_hosts, + int replication_factor, + char **cluster, + int cluster_size); + +/* test input data*/ +typedef struct sTestInputData +{ + int m_num_hosts_in_cluster; /* cluster size mustn't exceed 65025 - see function create_cluster() */ + int m_num_data_fragments; /* number of fragments in the data we intend to allocate between the hawq segments */ + /* + * number of datanodes that hold the 'querried' data - there is one datanode + * on each cluster host - so there are datanodes + */ + int m_num_active_data_nodes; + int m_num_of_fragment_replicas; + int m_num_segments_on_host;/* number of Hawq segments on each cluster host - we assume all cluster hosts have Hawq segments installed */ + /* + * the subset of Hawq segments that will do the processing - not all the Hawqs segments + * in the cluster are involved. + * This parameter plays the role of max_participants_allowed that is passed to map_hddata_2gp_segments() + * in createplan.c + */ + int m_num_working_segs; + bool m_enable_print_input_cluster; + bool m_enable_print_input_fragments; + bool m_enable_print_input_segments; + bool m_enable_print_allocated_fragments; +} TestInputData; + +static void test__distribute_work_to_gp_segments(TestInputData *input); +/* + * TRACING CAPABILITIES + * The unitest validates the behaviour of the SUT function distribute_work_2_gp_segments() using + * the assert_XXX_... functions. But in order to understand the behaviour of the allocation algorithm + * it can be helpful to look at the various data structures involved. For this purpose we have + * several print functions: + * a. print_cluster(...) + * b. print_fragment_list(...) + * c. print_segments_list(...) + * d. print_allocated_fragments(...) + * All these trace function have the output disabled by default. To enable the output of any print + * function set the booleans enable_trace_... for the respective function + * test__distribute_work_2_gp_segments() + */ + + +void +test__distribute_work_to_gp_segments__big_cluster_few_active_nodes(void **state) +{ + TestInputData *input = (TestInputData*)palloc0(sizeof(TestInputData)); + + input->m_num_hosts_in_cluster = 1000; /* cluster size musn't exceed 65025 - see function create_cluster() */ + input->m_num_data_fragments = 100; /* number of fragments in the data we intend to allocate between the hawq segments */ + input->m_num_active_data_nodes = 10; /* number of datanodes that hold the 'querried' data - there one datanode om each cluster host - so there are datanodes */ + input->m_num_of_fragment_replicas = 3; + input->m_num_segments_on_host = 1;/* number of Hawq segments on each cluster host - we assume all cluster hosts have Hawq segments installed */ + input->m_num_working_segs = 64; /* the subset of Hawq segments that will do the processing - not all the Hawqs segments in the cluster are involved */ + input->m_enable_print_input_cluster = false; + input->m_enable_print_input_fragments = false; + input->m_enable_print_input_segments = false; + input->m_enable_print_allocated_fragments = false; + + test__distribute_work_to_gp_segments(input); + + pfree(input); +} + +void +test__distribute_work_to_gp_segments__big_cluster_many_active_nodes(void **state) +{ + TestInputData *input = (TestInputData*)palloc0(sizeof(TestInputData)); + + input->m_num_hosts_in_cluster = 1000; /* cluster size musn't exceed 65025 - see function create_cluster() */ + input->m_num_data_fragments = 100; /* number of fragments in the data we intend to allocate between the hawq segments */ + input->m_num_active_data_nodes = 100; /* number of datanodes that hold the 'querried' data - there one datanode om each cluster host - so there are datanodes */ + input->m_num_of_fragment_replicas = 3; + input->m_num_segments_on_host = 4;/* number of Hawq segments on each cluster host - we assume all cluster hosts have Hawq segments installed */ + input->m_num_working_segs = 64; /* the subset of Hawq segments that will do the processing - not all the Hawqs segments in the cluster are involved */ + input->m_enable_print_input_cluster = false; + input->m_enable_print_input_fragments = false; + input->m_enable_print_input_segments = false; + input->m_enable_print_allocated_fragments = false; + + test__distribute_work_to_gp_segments(input); + pfree(input); +} + +void +test__distribute_work_to_gp_segments__small_cluster(void **state) +{ + TestInputData *input = (TestInputData*)palloc0(sizeof(TestInputData)); + + input->m_num_hosts_in_cluster = 100; /* cluster size musn't exceed 65025 - see function create_cluster() */ + input->m_num_data_fragments = 100; /* number of fragments in the data we intend to allocate between the hawq segments */ + input->m_num_active_data_nodes = 50; /* number of datanodes that hold the 'querried' data - there one datanode om each cluster host - so there are datanodes */ + input->m_num_of_fragment_replicas = 3; + input->m_num_segments_on_host = 4;/* number of Hawq segments on each cluster host - we assume all cluster hosts have Hawq segments installed */ + input->m_num_working_segs = 64; /* the subset of Hawq segments that will do the processing - not all the Hawqs segments in the cluster are involved */ + input->m_enable_print_input_cluster = false; + input->m_enable_print_input_fragments = false; + input->m_enable_print_input_segments = false; + input->m_enable_print_allocated_fragments = false; + + test__distribute_work_to_gp_segments(input); + pfree(input); +} + +void +test__distribute_work_to_gp_segments__small_cluster_many_active_nodes(void **state) +{ + TestInputData *input = (TestInputData*)palloc0(sizeof(TestInputData)); + + input->m_num_hosts_in_cluster = 100; /* cluster size musn't exceed 65025 - see function create_cluster() */ + input->m_num_data_fragments = 100; /* number of fragments in the data we intend to allocate between the hawq segments */ + input->m_num_active_data_nodes = 90; /* number of datanodes that hold the 'querried' data - there one datanode om each cluster host - so there are datanodes */ + input->m_num_of_fragment_replicas = 3; + input->m_num_segments_on_host = 4;/* number of Hawq segments on each cluster host - we assume all cluster hosts have Hawq segments installed */ + input->m_num_working_segs = 64; /* the subset of Hawq segments that will do the processing - not all the Hawqs segments in the cluster are involved */ + input->m_enable_print_input_cluster = false; + input->m_enable_print_input_fragments = false; + input->m_enable_print_input_segments = false; + input->m_enable_print_allocated_fragments = false; + + test__distribute_work_to_gp_segments(input); + pfree(input); +} + +void +test__distribute_work_to_gp_segments__small_cluster_few_replicas(void **state) +{ + TestInputData *input = (TestInputData*)palloc0(sizeof(TestInputData)); + + input->m_num_hosts_in_cluster = 100; /* cluster size musn't exceed 65025 - see function create_cluster() */ + input->m_num_data_fragments = 100; /* number of fragments in the data we intend to allocate between the hawq segments */ + input->m_num_active_data_nodes = 90; /* number of datanodes that hold the 'querried' data - there one datanode om each cluster host - so there are datanodes */ + input->m_num_of_fragment_replicas = 2; + input->m_num_segments_on_host = 4;/* number of Hawq segments on each cluster host - we assume all cluster hosts have Hawq segments installed */ + input->m_num_working_segs = 64; /* the subset of Hawq segments that will do the processing - not all the Hawqs segments in the cluster are involved */ + input->m_enable_print_input_cluster = false; + input->m_enable_print_input_fragments = false; + input->m_enable_print_input_segments = false; + input->m_enable_print_allocated_fragments = false; + + test__distribute_work_to_gp_segments(input); + pfree(input); +} + +/* + * Testing distribute_work_2_gp_segments + */ +static void test__distribute_work_to_gp_segments(TestInputData *input) +{ + List **segs_allocated_data = NULL; + List * input_fragments_list = NIL; + char** array_of_segs = NULL; + bool *array_of_primaries; + int total_segs; + bool cluster_size_not_exceeded = input->m_num_hosts_in_cluster <= 65025; + + assert_true(cluster_size_not_exceeded); + /* + * 1. Initialize the test input parameters + * We are testing an N hosts cluster. The size of the cluster is set in this section - section 1. + * Basic test assumptions: + * a. There is one datanode on each host in the cluster + * b. There are Hawq segments on each host in the cluster. + * c. There is an equal number of Hawq segments on each host - hardcoded in this section + */ + int num_hosts_in_cluster = input->m_num_hosts_in_cluster; /* cluster size musn't exceed 65025 - see function create_cluster() */ + int num_data_fragments = input->m_num_data_fragments; /* number of fragments in the data we intend to allocate between the hawq segments */ + int num_active_data_nodes = input->m_num_active_data_nodes; /* number of datanodes that hold the 'querried' data - there one datanode om each cluster host - so there are datanodes */ + int num_of_fragment_replicas = input->m_num_of_fragment_replicas; + int num_segments_on_host = input->m_num_segments_on_host;/* number of Hawq segments on each cluster host - we assume all cluster hosts have Hawq segments installed */ + int num_working_segs = input->m_num_working_segs; /* the subset of Hawq segments that will do the processing - not all the Hawqs segments in the cluster are involved */ + bool enable_print_input_cluster = input->m_enable_print_input_cluster; + bool enable_print_input_fragments = input->m_enable_print_input_fragments; + bool enable_print_input_segments = input->m_enable_print_input_segments; + bool enable_print_allocated_fragments = input->m_enable_print_allocated_fragments; + + /* 2. Create the cluster */ + char **cluster = create_cluster(num_hosts_in_cluster); + + if (enable_print_input_cluster) + print_cluster(cluster, num_hosts_in_cluster); + + /* 3. Input - data fragments */ + input_fragments_list = spread_fragments_in_cluster(num_data_fragments, /* number of fragments in the data we are about to allocate */ + num_active_data_nodes, /* hosts */ + num_of_fragment_replicas, /* replicas */ + cluster, /* the whole cluster*/ + num_hosts_in_cluster/* the number of hosts in the cluster */); + if (enable_print_input_fragments) + print_fragment_list(input_fragments_list); + + /* 4. Input - hawq segments */ + total_segs = num_hosts_in_cluster * num_segments_on_host; + array_of_segs = create_array_of_segs(cluster, num_hosts_in_cluster, num_segments_on_host); + array_of_primaries = create_array_of_primaries(total_segs); + + buildCdbComponentDatabases(total_segs, array_of_segs, array_of_primaries); + if (enable_print_input_segments) + print_segments_list(); + + /* 5. Build QueryResource */ + buildQueryResource(num_hosts_in_cluster*num_segments_on_host, array_of_segs); + will_return(GetActiveQueryResource, resource); + will_return(GetActiveQueryResource, resource); + + /* 6. The actual unitest of distribute_work_2_gp_segments() */ + segs_allocated_data = distribute_work_2_gp_segments(input_fragments_list, total_segs, num_working_segs); + if (enable_print_allocated_fragments) + print_allocated_fragments(segs_allocated_data, total_segs); + + /* 7. The validations - verifying that the expected output was obtained */ + validate_total_fragments_allocated(segs_allocated_data, total_segs, num_data_fragments); + validate_max_load_per_segment(segs_allocated_data, total_segs, num_working_segs, num_data_fragments); + validate_all_working_segments_engagement(segs_allocated_data, total_segs, num_working_segs, num_data_fragments, num_hosts_in_cluster); + + /* 8. Cleanup */ + freeQueryResource(); + restoreCdbComponentDatabases(); + clean_cluster(cluster, num_hosts_in_cluster); + clean_array_of_segs(array_of_segs, total_segs); + clean_allocated_fragments(segs_allocated_data, total_segs); + pfree(array_of_primaries); +} + +/* create an array of segments based on the host in the cluster and the number of Hawq segments on host */ +static char** create_array_of_segs(char **cluster, int num_hosts, int num_segments_on_host) +{ + int i, j; + int total_segs = num_hosts * num_segments_on_host; + char **array_of_segs = (char**)palloc0(total_segs * sizeof(char *)); + + for (i = 0; i < num_hosts; i++) + { + for (j = 0; j < num_segments_on_host; j++) + { + array_of_segs[i * num_segments_on_host + j] = pstrdup(cluster[i]); + } + } + + return array_of_segs; +} + +/* clean the array of Hawq segments */ +static void clean_array_of_segs(char **array_of_segs, int total_segments) +{ + int i; + + for (i = 0; i < total_segments; i++) + pfree(array_of_segs[i]); + pfree(array_of_segs); +} + +static bool* create_array_of_primaries(int total_segments) +{ + int i; + bool *primaries = (bool*)palloc0(total_segments * sizeof(bool)); + for (i = 0; i < total_segments; i++) + primaries[i] = true; + + return primaries; +} + +/* gives an ip to each host in a num_hosts size cluster */ +static char** create_cluster(int num_hosts) +{ + char** cluster = (char**)palloc0(num_hosts * sizeof(char *)); + int i; + char *prefix = "1.2.%d.%d"; + int third_octet = 1; /* let's begin at 1 */ + int fourth_octet = 1; + StringInfoData ip; + initStringInfo(&ip); + + for (i = 0; i < num_hosts; i++) + { + appendStringInfo(&ip, prefix, third_octet, fourth_octet); + cluster[i] = pstrdup(ip.data); + /* this naming scheme will accomodate a cluster size up to 255x255 = 65025. */ + fourth_octet++; + if (fourth_octet == 256) + { + fourth_octet = 1; + third_octet++; + } + resetStringInfo(&ip); + } + + return cluster; +} + +/* release memory */ +static char** clean_cluster(char** cluster, int num_hosts) +{ + int i; + + for (i = 0; i < num_hosts; i++) + { + if (cluster[i]) + pfree(cluster[i]); + } + pfree(cluster); +} + +/* show the cluster*/ +static char** print_cluster(char** cluster, int num_hosts) +{ + int i; + StringInfoData msg; + initStringInfo(&msg); + + appendStringInfo(&msg, "cluster size: %d\n", num_hosts); + for (i = 0; i < num_hosts; i++) + { + if (cluster[i]) + appendStringInfo(&msg, "cluster #%d: %s\n", i + 1, cluster[i]); + else + appendStringInfo(&msg, "cluster naming error \n"); + } + + elog(FRAGDEBUG, "%s", msg.data); + pfree(msg.data); +} + +/* prints for each segments, the index and the host ip */ +static void print_segments_list() +{ + StringInfoData msg; + CdbComponentDatabases *test_cdb = GpAliveSegmentsInfo.cdbComponentDatabases; + initStringInfo(&msg); + + for (int i = 0; i < test_cdb->total_segment_dbs; ++i) + { + CdbComponentDatabaseInfo* component = &test_cdb->segment_db_info[i]; + appendStringInfo(&msg, "\nsegment -- index: %d, ip: %s", component->segindex, component->hostip); + } + + elog(FRAGDEBUG, "%s", msg.data); + pfree(msg.data); +} + +/* returns the ip of the segment's host */ +static char* find_segment_ip_by_index(int seg_index) +{ + CdbComponentDatabases *test_cdb = GpAliveSegmentsInfo.cdbComponentDatabases; + if (seg_index < 0 || seg_index >= test_cdb->total_segment_dbs) + assert_true(false); + + for (int i = 0; i < test_cdb->total_segment_dbs; ++i) + { + CdbComponentDatabaseInfo* seg = &test_cdb->segment_db_info[i]; + if (seg->segindex == seg_index) + return seg->hostip; + } + + /* we assert if an index outside the boundaries was supplied */ + assert_true(false); + return NULL; +} + +/* + * print the allocated fragments list + * allocated_fragments is an array of lists. The size of the array is total_segs. + * The list located at index i in the array , holds the fragments that will be processed + * by Hawq segment i + */ +static void print_allocated_fragments(List **allocated_fragments, int total_segs) +{ + StringInfoData msg; + initStringInfo(&msg); + appendStringInfo(&msg, "ALLOCATED FRAGMENTS FOR EACH SEGMENT:\n"); + + for (int i = 0; i < total_segs; i++) + { + if (allocated_fragments[i]) + { + ListCell *frags_cell = NULL; + foreach(frags_cell, allocated_fragments[i]) + { + AllocatedDataFragment *frag = (AllocatedDataFragment*)lfirst(frags_cell); + appendStringInfo(&msg, "%s\n", print_one_allocated_data_fragment(frag, i)); + } + } + } + + elog(FRAGDEBUG, "%s", msg.data); + if (msg.data) + pfree(msg.data); +} + +/* print one allocated fragment */ +static char* print_one_allocated_data_fragment(AllocatedDataFragment *frag, int seg_index) +{ + StringInfoData msg; + initStringInfo(&msg); + char* seg_ip = find_segment_ip_by_index(seg_index); + if (!seg_ip) + seg_ip = "INVALID SEGMENT INDEX"; + bool locality = (strcmp(frag->host, seg_ip) == 0) ? true : false; + + appendStringInfo(&msg, + "locality: %d, segment number: %d , segment ip: %s --- fragment index: %d, datanode host: %s, file: %s", + locality, seg_index, seg_ip, frag->index, frag->host, frag->source_name); + + return msg.data; +} + +/* release memory of allocated_fragments */ +void clean_allocated_fragments(List **allocated_fragments, int total_segs) +{ + for (int i = 0; i < total_segs; i++) + if (allocated_fragments[i]) + free_allocated_frags(allocated_fragments[i]); + pfree(allocated_fragments); +} + +/* calculate the optimal load distribution per segment */ +static int calc_load_per_segment(int input_total_fragments, int working_segs) +{ + return (input_total_fragments % working_segs) ? input_total_fragments / working_segs + 1: + input_total_fragments / working_segs; +} + +/* + * test that a host is uniq. + * the functions ensures that ip names are unique by managing a set of ips + * the set is implemented with a linked list + */ +static bool is_host_uniq(List** ips_list, char* ip) +{ + ListCell* cell; + foreach(cell, *ips_list) + { + char* foundip = (char*)lfirst(cell); + if (strcmp(foundip, ip) == 0) + return false; + } + + lappend(*ips_list, ip); + return true; +} + +/* validate that all input blocks were allocated */ +static void validate_total_fragments_allocated(List **allocated_fragments, int total_segs, int input_total_fragments) +{ + int total_fragments_allocated = 0; + + for (int i = 0; i < total_segs; i++) + { + if (allocated_fragments[i]) + total_fragments_allocated += list_length(allocated_fragments[i]); + } + + assert_int_equal(total_fragments_allocated, input_total_fragments); +} + +/* validate that the load per segment does not exceed the expected load */ +static void validate_max_load_per_segment(List **allocated_fragments, int total_segs, int working_segs, int input_total_fragments) +{ + int max_load = 0; + int load_per_segment = calc_load_per_segment(input_total_fragments, working_segs); + + for (int i = 0; i < total_segs; i++) + { + if (allocated_fragments[i] && list_length(allocated_fragments[i]) > max_load) + max_load = list_length(allocated_fragments[i]); + } + + bool load_per_segment_not_exceeded = load_per_segment >= max_load; + elog(FRAGDEBUG, "actual max_load: %d, expected load_per_segment: %d", max_load, load_per_segment); + assert_true(load_per_segment_not_exceeded); +} + +/* + * we validate that every working segment is engaged, by verifying that for the case when + * the load_per_segment is greater than one, then every working_segment has allocated fragments, + * and for the case when load_per_segment is 1, then the number of segments that got work + * equals the number of fragments + */ +static void validate_all_working_segments_engagement(List **allocated_fragments, + int total_segs, + int working_segs, + int input_total_fragments, + int num_hosts_in_cluster) +{ + List* ips_list = NIL; + ListCell* cell; + int total_segs_engaged = 0; + int load_per_segment = calc_load_per_segment(input_total_fragments, working_segs); + bool require_full_distribution = num_hosts_in_cluster >= working_segs; + + for (int i = 0; i < total_segs; i++) + if (allocated_fragments[i] && list_length(allocated_fragments[i]) > 0) + { + char *ip; + bool isuniq; + total_segs_engaged++; + if (require_full_distribution) + { + ip = find_segment_ip_by_index(i); + isuniq = is_host_uniq(&ips_list, ip); + assert_true(isuniq); + } + } + + if (load_per_segment == 1) + assert_int_equal(total_segs_engaged, input_total_fragments); + else + { + bool total_segs_engaged_not_exceeded = total_segs_engaged <= working_segs; + assert_true(total_segs_engaged_not_exceeded); + } + + /* clean memory */ + foreach(cell, ips_list) + pfree(lfirst(cell)); + list_free(ips_list); +} + +/* + * Creates a list of DataFragment for one file ("file.txt"). + * The important thing here is the fragments' location. It is deteremined by the parameters: + * replication_factor - number of copies of each fragment on the different hosts. + * number_of_hosts - number of hosts + * number_of_fragments - number of fragments in the file. + * cluster - holds the ips of all hosts in the cluster + * + * Each fragment will have hosts from the cluster + */ +static List* +spread_fragments_in_cluster(int number_of_fragments, + int number_of_hosts, + int replication_factor, + char **cluster, + int cluster_size) +{ + int first_host, target_host; + List* fragments_list = NIL; + StringInfoData string_info; + initStringInfo(&string_info); + + /* pick the first host in the cluster that will host the data. The fragments will be spread from this host onward */ + first_host = 0; + + target_host = first_host; + + for (int i = 0; i < number_of_fragments; ++i) + { + DataFragment* fragment = (DataFragment*) palloc0(sizeof(DataFragment)); + + fragment->index = i; + fragment->source_name = pstrdup("file.txt"); + + for (int j = 0; j < replication_factor; ++j) + { + FragmentHost* fhost = (FragmentHost*)palloc0(sizeof(FragmentHost)); + appendStringInfo(&string_info, cluster[target_host]); + fhost->ip = pstrdup(string_info.data); + resetStringInfo(&string_info); + fragment->replicas = lappend(fragment->replicas, fhost); + + target_host = ((j + i + first_host) % number_of_hosts); + } + assert_int_equal(list_length(fragment->replicas), replication_factor); + appendStringInfo(&string_info, "metadata %d", i); + fragment->fragment_md = pstrdup(string_info.data); + resetStringInfo(&string_info); + appendStringInfo(&string_info, "user data %d", i); + fragment->user_data = pstrdup(string_info.data); + resetStringInfo(&string_info); + fragments_list = lappend(fragments_list, fragment); + } + + pfree(string_info.data); + return fragments_list; +} + http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/7c2f615d/src/backend/access/external/test/hd_work_mgr_do_segment_clustering_by_host_test.c ---------------------------------------------------------------------- diff --git a/src/backend/access/external/test/hd_work_mgr_do_segment_clustering_by_host_test.c b/src/backend/access/external/test/hd_work_mgr_do_segment_clustering_by_host_test.c new file mode 100644 index 0000000..3d3dbe5 --- /dev/null +++ b/src/backend/access/external/test/hd_work_mgr_do_segment_clustering_by_host_test.c @@ -0,0 +1,99 @@ +#include +#include +#include +#include "cmockery.h" + +#include "c.h" +#include "hd_work_mgr_mock.h" + + +/* + * check element list_index in segmenet_list + * has the expected hostip. + */ +void check_segment_info(List* segment_list, int list_index, + const char* expected_hostip) +{ + + CdbComponentDatabaseInfo* seg_info = + (CdbComponentDatabaseInfo*)lfirst(list_nth_cell(segment_list, list_index)); + assert_string_equal(seg_info->hostip, expected_hostip); +} + +/* + * Test clustering of segments to hosts. + * Environment: 10 segments over 3 hosts, all primary. + */ +void +test__do_segment_clustering_by_host__10SegmentsOn3Hosts(void **state) +{ + List* groups = NIL; + ListCell* cell = NULL; + GpHost* gphost = NULL; + List* segs = NIL; + CdbComponentDatabaseInfo* seg_info = NULL; + + char* array_of_segs[10] = + {"1.2.3.1", "1.2.3.1", "1.2.3.1", "1.2.3.1", + "1.2.3.2", "1.2.3.2", "1.2.3.2", + "1.2.3.3", "1.2.3.3", "1.2.3.3" + }; + bool array_of_primaries[10] = + { + true, true, true, true, + true, true, true, + true, true, true + }; + int number_of_segments = 10; + /* sanity */ + assert_true(number_of_segments == (sizeof(array_of_segs) / sizeof(array_of_segs[0]))); + assert_true(number_of_segments == (sizeof(array_of_primaries) / sizeof(array_of_primaries[0]))); + + buildCdbComponentDatabases(number_of_segments, array_of_segs, array_of_primaries); + + CdbComponentDatabases *cdb = GpAliveSegmentsInfo.cdbComponentDatabases; + + /* sanity for cdbComponentDatabases building*/ + assert_int_equal(cdb->total_segment_dbs, number_of_segments); + assert_string_equal(cdb->segment_db_info[4].hostip, array_of_segs[4]); + + /* build QueryResource */ + buildQueryResource(10, array_of_segs); + will_return(GetActiveQueryResource, resource); + + /* test do_segment_clustering_by_host */ + groups = do_segment_clustering_by_host(); + + assert_int_equal(list_length(groups), 3); + + cell = list_nth_cell(groups, 0); + gphost = (GpHost*)lfirst(cell); + assert_string_equal(gphost->ip, array_of_segs[0]); + assert_int_equal(list_length(gphost->segs), 4); + for (int i = 0; i < 4; ++i) + { + check_segment_info(gphost->segs, i, "1.2.3.1"); + } + + cell = list_nth_cell(groups, 1); + gphost = (GpHost*)lfirst(cell); + assert_string_equal(gphost->ip, "1.2.3.2"); + assert_int_equal(list_length(gphost->segs), 3); + for (int i = 0; i < 3; ++i) + { + check_segment_info(gphost->segs, i, "1.2.3.2"); + } + + cell = list_nth_cell(groups, 2); + gphost = (GpHost*)lfirst(cell); + assert_string_equal(gphost->ip, "1.2.3.3"); + assert_int_equal(list_length(gphost->segs), 3); + for (int i = 0; i < 3; ++i) + { + check_segment_info(gphost->segs, i, "1.2.3.3"); + } + + freeQueryResource(); + restoreCdbComponentDatabases(); +} + http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/7c2f615d/src/backend/access/external/test/hd_work_mgr_mock.c ---------------------------------------------------------------------- diff --git a/src/backend/access/external/test/hd_work_mgr_mock.c b/src/backend/access/external/test/hd_work_mgr_mock.c new file mode 100644 index 0000000..a03313c --- /dev/null +++ b/src/backend/access/external/test/hd_work_mgr_mock.c @@ -0,0 +1,100 @@ +#include +#include +#include +#include "cmockery.h" + +#include "c.h" +#include "hd_work_mgr_mock.h" + +/* + * Helper functions to create and restore GpAliveSegmentsInfo.cdbComponentDatabases element + * used by hd_work_mgr + */ + +/* + * Builds an array of CdbComponentDatabaseInfo. + * Each segment is assigned a sequence number and an ip. + * segs_num - the number of segments + * segs_hostips - array of the ip of each segment + * primaries_map - array of which segments are primaries + */ +void buildCdbComponentDatabases(int segs_num, + char* segs_hostips[], + bool primaries_map[]) +{ + CdbComponentDatabases *test_cdb = palloc0(sizeof(CdbComponentDatabases)); + CdbComponentDatabaseInfo* component = NULL; + test_cdb->total_segment_dbs = segs_num; + test_cdb->segment_db_info = + (CdbComponentDatabaseInfo *) palloc0(sizeof(CdbComponentDatabaseInfo) * test_cdb->total_segment_dbs); + + for (int i = 0; i < test_cdb->total_segment_dbs; ++i) + { + component = &test_cdb->segment_db_info[i]; + component->segindex = i; + component->role = primaries_map[i] ? SEGMENT_ROLE_PRIMARY : SEGMENT_ROLE_MIRROR; + component->hostip = pstrdup(segs_hostips[i]); + } + + orig_cdb = GpAliveSegmentsInfo.cdbComponentDatabases; + orig_seg_count = GpAliveSegmentsInfo.aliveSegmentsCount; + GpAliveSegmentsInfo.cdbComponentDatabases = test_cdb; + GpAliveSegmentsInfo.aliveSegmentsCount = segs_num; +} + +void restoreCdbComponentDatabases() +{ + /* free test CdbComponentDatabases */ + if (GpAliveSegmentsInfo.cdbComponentDatabases) + freeCdbComponentDatabases(GpAliveSegmentsInfo.cdbComponentDatabases); + + GpAliveSegmentsInfo.cdbComponentDatabases = orig_cdb; + GpAliveSegmentsInfo.aliveSegmentsCount = orig_seg_count; +} + +/* Builds the QueryResource for a query */ +void buildQueryResource(int segs_num, + char * segs_hostips[]) +{ + resource = (QueryResource *)palloc(sizeof(QueryResource)); + + resource->segments = NULL; + for (int i = 0; i < segs_num; ++i) + { + Segment *segment = (Segment *)palloc0(sizeof(Segment)); + segment->hostip = pstrdup(segs_hostips[i]); + segment->segindex = i; + segment->alive = true; + + resource->segments = lappend(resource->segments, segment); + } + + return resource; +} + +/* Restores the QueryResource for a query */ +void freeQueryResource() +{ + if (resource) + { + ListCell *cell; + + if (resource->segments) + { + cell = list_head(resource->segments); + while(cell != NULL) + { + ListCell *tmp = cell; + cell = lnext(cell); + + pfree(((Segment *)lfirst(tmp))->hostip); + pfree(lfirst(tmp)); + pfree(tmp); + } + } + + pfree(resource); + resource = NULL; + } +} + http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/7c2f615d/src/backend/access/external/test/hd_work_mgr_mock.h ---------------------------------------------------------------------- diff --git a/src/backend/access/external/test/hd_work_mgr_mock.h b/src/backend/access/external/test/hd_work_mgr_mock.h new file mode 100644 index 0000000..2eda33e --- /dev/null +++ b/src/backend/access/external/test/hd_work_mgr_mock.h @@ -0,0 +1,84 @@ +#ifndef HD_WORK_MGR_MOCK_ +#define HD_WORK_MGR_MOCK_ + +#include +#include +#include +#include "cmockery.h" + +#include "c.h" +#include "../hd_work_mgr.c" + +static CdbComponentDatabases *orig_cdb = NULL; +static int orig_seg_count = -1; + +static QueryResource * resource = NULL; + +struct AliveSegmentsInfo +{ + uint64 fts_statusVersion; + TransactionId tid; + + /* Release gangs */ + bool cleanGangs; + + /* Used for debug & test */ + bool forceUpdate; + int failed_segmentid_start; + int failed_segmentid_number; + + /* Used for disptacher. */ + int4 aliveSegmentsCount; + int4 singleton_segindex; + Bitmapset *aliveSegmentsBitmap; + struct CdbComponentDatabases *cdbComponentDatabases; +}; + +typedef struct AliveSegmentsInfo AliveSegmentsInfo; + +AliveSegmentsInfo GpAliveSegmentsInfo = {0, 0, false, false, 0, 0, UNINITIALIZED_GP_IDENTITY_VALUE, 0, NULL, NULL}; + +/* + * Helper functions copied from backend/cdb/cdbutils.c + */ + +/* + * _freeCdbComponentDatabases + * + * Releases the storage occupied by the CdbComponentDatabases + * struct pointed to by the argument. + */ +void +_freeCdbComponentDatabases(CdbComponentDatabases *pDBs); + +/* + * _freeCdbComponentDatabaseInfo: + * Releases any storage allocated for members variables of a CdbComponentDatabaseInfo struct. + */ +void +_freeCdbComponentDatabaseInfo(CdbComponentDatabaseInfo *cdi); + +/* + * Helper functions to create and restore GpAliveSegmentsInfo.cdbComponentDatabases element + * used by hd_work_mgr + */ + +/* + * Builds an array of CdbComponentDatabaseInfo. + * Each segment is assigned a sequence number and an ip. + * segs_num - the number of segments + * segs_hostips - array of the ip of each segment + * primaries_map - array of which segments are primaries + */ +void buildCdbComponentDatabases(int segs_num, + char* segs_hostips[], + bool primaries_map[]); + + +void restoreCdbComponentDatabases(); + +void buildQueryResource(int segs_num, + char * segs_hostips[]); +void freeQueryResource(); + +#endif //HD_WORK_MGR_MOCK_ http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/7c2f615d/src/backend/access/external/test/hd_work_mgr_test.c ---------------------------------------------------------------------- diff --git a/src/backend/access/external/test/hd_work_mgr_test.c b/src/backend/access/external/test/hd_work_mgr_test.c new file mode 100644 index 0000000..382927b --- /dev/null +++ b/src/backend/access/external/test/hd_work_mgr_test.c @@ -0,0 +1,33 @@ +#include +#include +#include +#include "cmockery.h" + +#include "c.h" +#include "hd_work_mgr_mock.c" +#include "hd_work_mgr_do_segment_clustering_by_host_test.c" +#include "hd_work_mgr_allocate_fragments_to_datanodes_test.c" +#include "hd_work_mgr_distribute_work_2_gp_segments_test.c" + +int +main(int argc, char* argv[]) +{ + cmockery_parse_arguments(argc, argv); + + const UnitTest tests[] = { + unit_test(test__do_segment_clustering_by_host__10SegmentsOn3Hosts), + unit_test(test__get_dn_processing_load), + unit_test(test__create_allocated_fragment__NoUserData), + unit_test(test__create_allocated_fragment__WithUserData), + unit_test(test__allocate_fragments_to_datanodes__4Fragments10Hosts3Replicates), + unit_test(test__allocate_fragments_to_datanodes__4Fragments3Hosts2Replicates), + unit_test(test__allocate_fragments_to_datanodes__4Fragments3Hosts1Replicates), + unit_test(test__allocate_fragments_to_datanodes__7Fragments10Hosts1Replicates), + unit_test(test__distribute_work_to_gp_segments__big_cluster_few_active_nodes), + unit_test(test__distribute_work_to_gp_segments__big_cluster_many_active_nodes), + unit_test(test__distribute_work_to_gp_segments__small_cluster), + unit_test(test__distribute_work_to_gp_segments__small_cluster_many_active_nodes), + unit_test(test__distribute_work_to_gp_segments__small_cluster_few_replicas) + }; + return run_tests(tests); +}