hawq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From h...@apache.org
Subject [2/2] incubator-hawq git commit: HAWQ-288. Support hawqfaultinjector as an internal utility for fault injection purpose - client side
Date Tue, 29 Dec 2015 09:29:53 GMT
HAWQ-288. Support hawqfaultinjector as an internal utility for fault injection purpose - client side


Project: http://git-wip-us.apache.org/repos/asf/incubator-hawq/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-hawq/commit/8cdae414
Tree: http://git-wip-us.apache.org/repos/asf/incubator-hawq/tree/8cdae414
Diff: http://git-wip-us.apache.org/repos/asf/incubator-hawq/diff/8cdae414

Branch: refs/heads/master
Commit: 8cdae4145822373f2fe8de0083179bcbdece7a75
Parents: 4246999
Author: Ruilong Huo <rhuo@pivotal.io>
Authored: Tue Dec 29 17:27:02 2015 +0800
Committer: Ruilong Huo <rhuo@pivotal.io>
Committed: Tue Dec 29 17:29:07 2015 +0800

----------------------------------------------------------------------
 src/all_src_files.txt                           |   1 +
 src/backend/postmaster/postmaster.c             |  25 +-
 src/bin/Makefile                                |   2 +-
 src/bin/gpmirrortransition/.gitignore           |   2 +
 src/bin/gpmirrortransition/Makefile             |  49 ++
 src/bin/gpmirrortransition/gpmirrortransition.c | 353 +++++++++
 src/include/pg_config_manual.h                  |   2 +-
 tools/bin/Makefile                              |   6 +-
 tools/bin/gppylib/commands/base.py              |   8 +
 tools/bin/hawqfaultinjector                     |  27 +
 tools/bin/hawqpylib/Makefile                    |  28 +
 tools/bin/hawqpylib/hawqarray.py                | 729 +++++++++++++++++++
 tools/bin/hawqpylib/mainUtils.py                | 655 +++++++++++++++++
 tools/bin/hawqpylib/programs/__init__.py        |   0
 tools/bin/hawqpylib/programs/clsInjectFault.py  | 441 +++++++++++
 .../hawqpylib/system/ComputeCatalogUpdate.py    | 442 +++++++++++
 tools/bin/hawqpylib/system/__init__.py          |   0
 .../hawqpylib/system/configurationImplHAWQ.py   | 474 ++++++++++++
 18 files changed, 3239 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/8cdae414/src/all_src_files.txt
----------------------------------------------------------------------
diff --git a/src/all_src_files.txt b/src/all_src_files.txt
index 68738d7..f7ca7ad 100644
--- a/src/all_src_files.txt
+++ b/src/all_src_files.txt
@@ -1450,6 +1450,7 @@ bin/gpfusion/common.h
 bin/gpfusion/gpbridgeapi.c
 bin/gpfusion/gpdbwritableformatter.c
 bin/gpfusion/pxf.c
+bin/gpmirrortransition/gpmirrortransition.c
 bin/gpupgrade/gpmodcatversion.c
 bin/gpupgrade/gpviewcp.c
 bin/initdb/initdb.c

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/8cdae414/src/backend/postmaster/postmaster.c
----------------------------------------------------------------------
diff --git a/src/backend/postmaster/postmaster.c b/src/backend/postmaster/postmaster.c
index ff954b8..c2fafb3 100644
--- a/src/backend/postmaster/postmaster.c
+++ b/src/backend/postmaster/postmaster.c
@@ -2675,6 +2675,30 @@ readNextStringFromString(char *buf, int *offsetInOut, int length)
 	return result;
 }
 
+#ifdef FAULT_INJECTOR
+/**
+ *  Returns 0 if the string could not be read and sets *wasRead (if wasRead is non-NULL) to false
+ */
+static int
+readIntFromString( char *buf, int *offsetInOut, int length, bool *wasRead)
+{
+	int res;
+	char *val = readNextStringFromString(buf, offsetInOut, length);
+	if (val == NULL)
+	{
+		if (wasRead)
+			*wasRead = false;
+		return 0;
+	}
+
+	if (wasRead)
+		*wasRead = true;
+	res = atoi(val);
+	pfree(val);
+	return res;
+}
+#endif
+
 static void sendPrimaryMirrorTransitionResult( const char *msg)
 {
 	StringInfoData buf;
@@ -2792,7 +2816,6 @@ static void processTransitionRequest_getFaultInjectStatus(void * buf, int *offse
 static void
 processTransitionRequest_faultInject(void * inputBuf, int *offsetPtr, int length)
 {
-#undef FAULT_INJECTOR
 #ifdef FAULT_INJECTOR
 	bool wasRead;
 	char *faultName = readNextStringFromString(inputBuf, offsetPtr, length);

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/8cdae414/src/bin/Makefile
----------------------------------------------------------------------
diff --git a/src/bin/Makefile b/src/bin/Makefile
index 73425b9..9d4919d 100644
--- a/src/bin/Makefile
+++ b/src/bin/Makefile
@@ -15,7 +15,7 @@ include $(top_builddir)/src/Makefile.global
 
 DIRS = initdb ipcclean pg_ctl pg_dump pgbench \
 	psql scripts pg_config pg_controldata pg_resetxlog \
-	gpfilesystem/hdfs gpupgrade \
+	gpfilesystem/hdfs gpmirrortransition gpupgrade \
 	gpfusion gp_workfile_mgr gpcheckhdfs gpfdist
 
 all install installdirs uninstall distprep clean distclean maintainer-clean:

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/8cdae414/src/bin/gpmirrortransition/.gitignore
----------------------------------------------------------------------
diff --git a/src/bin/gpmirrortransition/.gitignore b/src/bin/gpmirrortransition/.gitignore
new file mode 100644
index 0000000..e999eb8
--- /dev/null
+++ b/src/bin/gpmirrortransition/.gitignore
@@ -0,0 +1,2 @@
+gp_primarymirror
+

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/8cdae414/src/bin/gpmirrortransition/Makefile
----------------------------------------------------------------------
diff --git a/src/bin/gpmirrortransition/Makefile b/src/bin/gpmirrortransition/Makefile
new file mode 100755
index 0000000..93e1e33
--- /dev/null
+++ b/src/bin/gpmirrortransition/Makefile
@@ -0,0 +1,49 @@
+#-------------------------------------------------------------------------
+#
+# Makefile for src/bin/gpmirrortransition
+#
+# Portions Copyright (c) 2009 Greenplum Inc 
+#
+# This Makefile was copied from the pg_dump makefile and modified accordingly
+#
+# $PostgreSQL: pgsql/src/bin/gpmirrortransition/Makefile,v 1.62 2006/03/05 15:58:50 momjian Exp $
+#
+#-------------------------------------------------------------------------
+
+PGFILEDESC = "gp_primarymirror - inform a segment of a change in primary/mirror status"
+subdir = src/bin/gpmirrortransition
+top_builddir = ../../..
+include $(top_builddir)/src/Makefile.global
+
+# The frontend doesn't need everything that's in LIBS, some are backend only
+LIBS := $(filter-out -llapack -lblas -lf2c -lresolv, $(LIBS))
+# This program isn't interactive, so doesn't need these
+LIBS := $(filter-out -lreadline -ledit -ltermcap -lncurses -lcurses -lcurl -lssl -lcrypto, $(LIBS))
+
+# the use of tempnam in pg_backup_tar.c causes a warning when using newer versions of GCC
+override CPPFLAGS := -Wno-deprecated-declarations -DFRONTEND -I$(libpq_srcdir) $(CPPFLAGS)
+
+OBJS=gpmirrortransition.o $(WIN32RES)
+
+EXTRA_OBJS = $(top_builddir)/src/backend/libpq/ip.o $(top_builddir)/src/backend/postmaster/primary_mirror_transition_client.o $(top_builddir)/src/timezone/gptime.o
+
+all: submake-libpq submake-libpgport submake-backend gp_primarymirror
+
+gp_primarymirror: gpmirrortransition.o $(OBJS) $(EXTRA_OBJS) $(libpq_builddir)/libpq.a 
+	$(CC) $(CFLAGS) $(OBJS) $(EXTRA_OBJS) $(libpq_pgport) $(LDFLAGS) $(LIBS) -o $@$(X)
+
+.PHONY: submake-backend
+submake-backend:
+	$(MAKE) -C $(top_builddir)/src/backend/libpq ip.o
+
+install: all installdirs
+	$(INSTALL_PROGRAM) gp_primarymirror$(X) '$(DESTDIR)$(bindir)'/gp_primarymirror$(X)
+
+installdirs:
+	$(MKDIR_P) '$(DESTDIR)$(bindir)'
+
+uninstall:
+	rm -f $(addprefix '$(DESTDIR)$(bindir)'/, gp_primarymirror$(X))
+
+clean distclean maintainer-clean:
+	rm -f gp_primarymirror$(X) $(OBJS) gpmirrortransition.o

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/8cdae414/src/bin/gpmirrortransition/gpmirrortransition.c
----------------------------------------------------------------------
diff --git a/src/bin/gpmirrortransition/gpmirrortransition.c b/src/bin/gpmirrortransition/gpmirrortransition.c
new file mode 100755
index 0000000..06e952a
--- /dev/null
+++ b/src/bin/gpmirrortransition/gpmirrortransition.c
@@ -0,0 +1,353 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/*
+ * Utility to contact a segment and issue a primary/mirror mode transition
+ */
+#include "postmaster/primary_mirror_mode.h"
+#include "postmaster/primary_mirror_transition_client.h"
+#include <unistd.h>
+#include "libpq/pqcomm.h"
+#include "libpq/ip.h"
+
+#ifdef HAVE_GETOPT_H
+#include <getopt.h>
+#endif
+
+/* buffer size for message to segment */
+#define SEGMENT_MSG_BUF_SIZE     4096
+
+/**
+ * gpmirrortransition builds a message from parameters and transmits it to the given server as
+ *    mirror transition message.
+ */
+
+
+static inline bool
+isEmpty(char *str)
+{
+	return str == NULL || str[0] == '\0';
+}
+
+static bool
+gpCheckForNeedToExitFn(void)
+{
+	return false;
+}
+
+static void
+gpMirrorErrorLogFunction(char *str)
+{
+	fprintf(stderr, "%s\n", str);
+}
+
+static void
+gpMirrorReceivedDataCallbackFunction(char *buf)
+{
+	fprintf(stderr, "%s\n", buf);
+}
+
+/**
+ * *addrList will be filled in with the address(es) of the host/port when true is returned
+ *
+ * host/port may not be NULL
+ */
+static bool
+determineTargetHost( struct addrinfo **addrList, char *host, char *port)
+{
+	struct addrinfo hint;
+	int			ret;
+
+	*addrList = NULL;
+
+	/* Initialize hint structure */
+	MemSet(&hint, 0, sizeof(hint));
+	hint.ai_socktype = SOCK_STREAM;
+	hint.ai_family = AF_UNSPEC;
+
+	/* Using pghost, so we have to look-up the hostname */
+	hint.ai_family = AF_UNSPEC;
+
+	/* Use pg_getaddrinfo_all() to resolve the address */
+	ret = pg_getaddrinfo_all(host, port, &hint, addrList);
+	if (ret || ! *addrList)
+	{
+		fprintf(stderr,"could not translate host name \"%s\" to address: %s\n", host, gai_strerror(ret));
+		return false;
+	}
+	return true;
+}
+
+static char*
+readFully(FILE *f, int *msgLenOut)
+{
+	int bufSize = 10;
+	char *buf = malloc(bufSize * sizeof(char));
+	int bufOffset = 0;
+
+	if ( buf == NULL )
+	{
+		fprintf(stderr, "Out of memory\n");
+		return NULL;
+	}
+
+	for ( ;; )
+	{
+		int numRead;
+
+		errno = 0;
+		numRead = fread(buf + bufOffset, sizeof(char), bufSize - bufOffset, f);
+		if ( errno != 0 )
+		{
+			if ( feof(f))
+				break;
+			fprintf( stderr, "Error reading input.  Error code %d\n", errno);
+			return NULL;
+		}
+		else if ( numRead <= 0 && feof(f))
+			break;
+
+		bufOffset += numRead;
+
+		if ( bufOffset == bufSize )
+		{
+			// increase size!
+			bufSize *= 2;
+			buf = realloc(buf, bufSize * sizeof(char));
+			if ( buf == NULL )
+			{
+				fprintf(stderr, "Out of memory\n");
+				return NULL;
+			}
+		}
+	}
+
+	*msgLenOut = bufOffset;
+	return buf;
+}
+
+
+int
+main(int argc, char **argv)
+{
+	struct addrinfo *addrList = NULL;
+
+	char *host = NULL, *port = NULL, *inputFile = NULL;
+
+	char *mode = NULL;
+	char *status = NULL;
+	char *seg_addr = NULL;
+	char *seg_pm_port = NULL;
+	char *seg_rep_port = NULL;
+	char *peer_addr = NULL;
+	char *peer_pm_port = NULL;
+	char *peer_rep_port = NULL;
+
+	char *num_retries_str = NULL;
+	char *transition_timeout_str = NULL;
+	
+	int num_retries = 20;
+	int transition_timeout = 3600;  /* 1 hour */
+	
+	char opt;
+
+	char msgBuffer[SEGMENT_MSG_BUF_SIZE];
+	char *msg = NULL;
+	int msgLen = 0;
+
+	while ((opt = getopt(argc, argv, "m:s:H:P:R:h:p:r:i:n:t:")) != -1)
+	{
+		switch (opt)
+		{
+			case 'i':
+				inputFile = optarg;
+				break;
+			case 'm':
+				mode = optarg;
+				break;
+			case 's':
+				status = optarg;
+				break;
+			case 'H':
+				seg_addr = optarg;
+				break;
+			case 'P':
+				seg_pm_port = optarg;
+				break;
+			case 'R':
+				seg_rep_port = optarg;
+				break;
+			case 'h':
+				host = peer_addr = optarg;
+				break;
+			case 'p':
+				port = peer_pm_port = optarg;
+				break;
+			case 'r':
+				peer_rep_port = optarg;
+				break;
+			case 'n':
+				num_retries_str = optarg;
+				break;
+			case 't':
+				transition_timeout_str = optarg;
+				break;
+			case '?':
+				fprintf(stderr, "Unrecognized option: -%c\n", optopt);
+		}
+	}
+
+	if (num_retries_str != NULL)
+	{
+		num_retries = (int) strtol(num_retries_str, NULL, 10);
+		if (num_retries == 0 || errno == ERANGE)
+		{
+			fprintf(stderr, "Invalid num_retries (-n) argument\n");
+			return TRANS_ERRCODE_ERROR_INVALID_ARGUMENT;
+		}
+	}
+	
+	if (transition_timeout_str != NULL)
+	{
+		transition_timeout = (int) strtol (transition_timeout_str, NULL, 10);
+		if (transition_timeout == 0 || errno == ERANGE)
+		{
+			fprintf(stderr, "Invalid transition_timeout (-t) argument\n");
+			return TRANS_ERRCODE_ERROR_INVALID_ARGUMENT;
+		}
+	}
+
+	/* check if input file parameter is passed */
+	if (seg_addr == NULL)
+	{
+		if ( host == NULL)
+		{
+			fprintf(stderr, "Missing host (-h) argument\n");
+			return TRANS_ERRCODE_ERROR_INVALID_ARGUMENT;
+		}
+		if ( port == NULL )
+		{
+			fprintf(stderr, "Missing port (-p) argument\n");
+			return TRANS_ERRCODE_ERROR_INVALID_ARGUMENT;
+		}
+
+		/* find the target machine */
+		if ( ! determineTargetHost(&addrList, host, port))
+		{
+			return TRANS_ERRCODE_ERROR_HOST_LOOKUP_FAILED;
+		}
+
+		/* load the input message into memory */
+		if ( inputFile == NULL)
+		{
+			msg = readFully(stdin, &msgLen);
+		}
+		else
+		{
+
+			FILE *f = fopen(inputFile, "r");
+			if ( f == NULL)
+			{
+				fprintf(stderr, "Unable to open file %s\n", inputFile);
+				return TRANS_ERRCODE_ERROR_READING_INPUT;
+			}
+			msg = readFully(f, &msgLen);
+			fclose(f);
+		}
+	}
+	else
+	{
+		/* build message from passed parameters */
+
+		if (mode == NULL)
+		{
+			fprintf(stderr, "Missing mode (-m) argument\n");
+			return TRANS_ERRCODE_ERROR_INVALID_ARGUMENT;
+		}
+		if (status == NULL)
+		{
+			fprintf(stderr, "Missing status (-s) argument\n");
+			return TRANS_ERRCODE_ERROR_INVALID_ARGUMENT;
+		}
+		if (seg_addr == NULL)
+		{
+			fprintf(stderr, "Missing segment host (-H) argument\n");
+			return TRANS_ERRCODE_ERROR_INVALID_ARGUMENT;
+		}
+		if (seg_pm_port == NULL)
+		{
+			fprintf(stderr, "Missing segment postmaster port (-P) argument\n");
+			return TRANS_ERRCODE_ERROR_INVALID_ARGUMENT;
+		}
+		if (seg_rep_port == NULL)
+		{
+			fprintf(stderr, "Missing segment replication port (-R) argument\n");
+			return TRANS_ERRCODE_ERROR_INVALID_ARGUMENT;
+		}
+		if (peer_addr == NULL)
+		{
+			fprintf(stderr, "Missing peer host (-h) argument\n");
+			return TRANS_ERRCODE_ERROR_INVALID_ARGUMENT;
+		}
+		if (peer_pm_port == NULL)
+		{
+			fprintf(stderr, "Missing peer postmaster port (-p) argument\n");
+			return TRANS_ERRCODE_ERROR_INVALID_ARGUMENT;
+		}
+		if (peer_rep_port == NULL)
+		{
+			fprintf(stderr, "Missing peer replication port (-r) argument\n");
+			return TRANS_ERRCODE_ERROR_INVALID_ARGUMENT;
+		}
+
+		/* build message */
+		msgLen = snprintf(
+			msgBuffer, sizeof(msgBuffer),
+			"%s\n%s\n%s\n%s\n%s\n%s\n%s\n",
+			mode,
+			status,
+			seg_addr,
+			seg_rep_port,
+			peer_addr,
+			peer_rep_port,
+			peer_pm_port
+			);
+
+		msg = msgBuffer;
+
+		/* find the target machine */
+		if (!determineTargetHost(&addrList, seg_addr, seg_pm_port))
+		{
+			return TRANS_ERRCODE_ERROR_HOST_LOOKUP_FAILED;
+		}
+	}
+
+	 /* check for errors while building the message */
+	if ( msg == NULL )
+	{
+		return TRANS_ERRCODE_ERROR_READING_INPUT;
+	}
+
+	/* send the message */
+	PrimaryMirrorTransitionClientInfo client;
+	client.receivedDataCallbackFn = gpMirrorReceivedDataCallbackFunction;
+	client.errorLogFn = gpMirrorErrorLogFunction;
+	client.checkForNeedToExitFn = gpCheckForNeedToExitFn;
+	return sendTransitionMessage(&client, addrList, msg, msgLen, num_retries, transition_timeout);
+}

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/8cdae414/src/include/pg_config_manual.h
----------------------------------------------------------------------
diff --git a/src/include/pg_config_manual.h b/src/include/pg_config_manual.h
index d97a280..002d92e 100644
--- a/src/include/pg_config_manual.h
+++ b/src/include/pg_config_manual.h
@@ -255,7 +255,7 @@
 /*
  * Enable injecting faults.
  */
-#define FAULT_INJECTOR 0
+#define FAULT_INJECTOR 1
 
 /*
  * Enable tracing of resource consumption during sort operations;

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/8cdae414/tools/bin/Makefile
----------------------------------------------------------------------
diff --git a/tools/bin/Makefile b/tools/bin/Makefile
index e8a9847..88b9634 100644
--- a/tools/bin/Makefile
+++ b/tools/bin/Makefile
@@ -179,6 +179,7 @@ unittest2:
 PYTHON_FILES=`grep -l --exclude=Makefile --exclude=gplogfilter --exclude=gpcheckos --exclude=gpgenfsmap.py --exclude=throttlingD.py "/bin/env python" *`\
 			 `grep -l "/bin/env python" $(SRC)/../sbin/*`\
 			 `find ./gppylib -name "*.py"`\
+			 `find ./hawqpylib -name "*.py"`\
 			 `find $(SRC)/../sbin -name "*.py"`
 
 checkcode: pylint
@@ -244,9 +245,10 @@ install: all
 	${INSTALL_SCRIPT} -d ${bindir}
 	for files in `find * -maxdepth 0 -type f | grep -x -v -E "${SKIP_INSTALL}"`; do ${INSTALL_SCRIPT} $${files} ${bindir}; done
 	${MAKE} -C gppylib $@
+	${MAKE} -C hawqpylib $@
 	${MAKE} -C ext $@
-	for dirs in `find hawqpylib -type d` ; do ${INSTALL_SCRIPT} -d ${bindir}/hawqpylib/$${dirs}; done
-	for files in `find hawqpylib -type f` ; do ${INSTALL_SCRIPT} $${files} ${bindir}/hawqpylib/; done
+	# for dirs in `find hawqpylib -type d` ; do ${INSTALL_SCRIPT} -d ${bindir}/hawqpylib/$${dirs}; done
+	# for files in `find hawqpylib -type f` ; do ${INSTALL_SCRIPT} $${files} ${bindir}/hawqpylib/; done
 	${INSTALL_SCRIPT} -d ${bindir}/lib
 	for files in `find lib -type f`; do ${INSTALL_SCRIPT} $${files} ${bindir}/lib; done
 	unset LIBPATH; ./generate-greenplum-path.sh $(prefix) > ${prefix}/greenplum_path.sh

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/8cdae414/tools/bin/gppylib/commands/base.py
----------------------------------------------------------------------
diff --git a/tools/bin/gppylib/commands/base.py b/tools/bin/gppylib/commands/base.py
index 801d85f..4ca68fa 100755
--- a/tools/bin/gppylib/commands/base.py
+++ b/tools/bin/gppylib/commands/base.py
@@ -697,11 +697,19 @@ class Command:
 
     def run(self,validateAfter=False):
         faultPoint = os.getenv('GP_COMMAND_FAULT_POINT')
+        print "### DEBUG: ENV[GP_COMMAND_FAULT_POINT] = %s" % (faultPoint if faultPoint else "None")
+        print "### DEBUG: self.name = %s" % ("SomeName" if self.name else "None")
         if not faultPoint or (self.name and not self.name.startswith(faultPoint)):
+            print "### DEBUG: EXECUTE name = %s" % ("SomeName" if self.name else "None")
+            print "### DEBUG: EXECUTE cmdStr = %s" % ("SomeCmdStr" if self.cmdStr else "None")
+            print "### DEBUG: EXECUTE context = %s" % ("SomeExecContext" if self.exec_context else "None")
+            print "### DEBUG: EXECUTE remoteHost = %s" % ("SomeRemoteHost" if self.remoteHost else "None")
             self.exec_context.execute(self)
         else:
             # simulate error
+            print "### DEBUG: CommandResult"
             self.results = CommandResult(1,'Fault Injection','Fault Injection' ,False,True)
+            print self.results
         
         if validateAfter:
             self.validate()

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/8cdae414/tools/bin/hawqfaultinjector
----------------------------------------------------------------------
diff --git a/tools/bin/hawqfaultinjector b/tools/bin/hawqfaultinjector
new file mode 100755
index 0000000..466dddf
--- /dev/null
+++ b/tools/bin/hawqfaultinjector
@@ -0,0 +1,27 @@
+#!/usr/bin/env python
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from hawqpylib.mainUtils import *
+
+# now reset of imports
+from hawqpylib.programs.clsInjectFault import *
+
+#-------------------------------------------------------------------------
+if __name__ == '__main__':
+    simple_main( HAWQInjectFaultProgram.createParser, HAWQInjectFaultProgram.createProgram)
+

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/8cdae414/tools/bin/hawqpylib/Makefile
----------------------------------------------------------------------
diff --git a/tools/bin/hawqpylib/Makefile b/tools/bin/hawqpylib/Makefile
new file mode 100644
index 0000000..d66f5d7
--- /dev/null
+++ b/tools/bin/hawqpylib/Makefile
@@ -0,0 +1,28 @@
+#-------------------------------------------------------------------------
+#
+# Makefile for the managerment utilities
+#
+#-------------------------------------------------------------------------
+
+subdir = tools/bin/hawqpylib
+top_builddir = ../../..
+include $(top_builddir)/src/Makefile.global
+
+SKIP_INSTALL=.gitignore|.p4ignore|.rcfile|Makefile|test/
+
+install:
+	${INSTALL_SCRIPT} -d ${libdir}/python/hawqpylib
+	@for file in `find * -type f | grep -v -E "${SKIP_INSTALL}"`; \
+		do \
+			echo "install $${file} into ${libdir}/python/hawqpylib/$${file}" ; \
+			${INSTALL_SCRIPT} $${file} ${libdir}/python/hawqpylib/$${file}; \
+		done
+	@for dirs in `find * -type d | grep -v test` ;\
+		do \
+			${INSTALL_SCRIPT} -d ${libdir}/python/hawqpylib/$${dirs}; \
+			for file in `find $${dirs} -type f | grep -v -E "${SKIP_INSTALL}"`; do \
+				echo "install $${file} into ${libdir}/python/hawqpylib/$${file}" ; \
+				${INSTALL_SCRIPT} $${file} ${libdir}/python/hawqpylib/$${file}; \
+			done \
+		done
+	

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/8cdae414/tools/bin/hawqpylib/hawqarray.py
----------------------------------------------------------------------
diff --git a/tools/bin/hawqpylib/hawqarray.py b/tools/bin/hawqpylib/hawqarray.py
new file mode 100755
index 0000000..eef59b9
--- /dev/null
+++ b/tools/bin/hawqpylib/hawqarray.py
@@ -0,0 +1,729 @@
+#!/usr/bin/env python
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+# 
+#   http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""
+  hawqarray.py:
+
+    Contains three classes representing configuration information of a
+    Greenplum array:
+
+      HAWQArray - The primary interface - collection of all HAWQDB within an array
+      HAWQDB    - represents configuration information for a single registration_order
+      Segment   - collection of all HAWQDB with the same registration_order
+"""
+
+# ============================================================================
+from datetime import date
+import copy
+import traceback
+
+from gppylib.utils import checkNotNone, checkIsInt
+from gppylib    import gplog
+from gppylib.db import dbconn
+from gppylib.gpversion import GpVersion
+from gppylib.commands.unix import *
+from hawqpylib.hawqlib import HawqXMLParser
+
+
+SYSTEM_FILESPACE = 3052        # oid of the system filespace
+
+logger = gplog.get_default_logger()
+
+DESTINATION_FILE_SPACES_DIRECTORY = "fs_directory"
+
+ROLE_MASTER  = 'm'
+ROLE_STANDBY = 's'
+ROLE_PRIMARY = 'p'
+VALID_ROLES  = [ROLE_MASTER, ROLE_STANDBY, ROLE_PRIMARY]
+
+
+STATUS_UP    = 'u'
+STATUS_DOWN  = 'd'
+VALID_STATUS = [STATUS_UP, STATUS_DOWN]
+
+
+# SegmentState values returned from gp_primarymirror.
+SEGMENT_STATE_NOT_INITIALIZED               = "NotInitialized"
+SEGMENT_STATE_INITIALIZATION                = "Initialization"
+SEGMENT_STATE_IN_CHANGE_TRACKING_TRANSITION = "InChangeTrackingTransition"
+SEGMENT_STATE_IN_RESYNCTRANSITION           = "InResyncTransition"                       
+SEGMENT_STATE_IN_SYNC_TRANSITION            = "InSyncTransition"
+SEGMENT_STATE_READY                         = "Ready"
+SEGMENT_STATE_CHANGE_TRACKING_DISABLED      = "ChangeTrackingDisabled"
+SEGMENT_STATE_FAULT                         = "Fault"
+SEGMENT_STATE_SHUTDOWN_BACKENDS             = "ShutdownBackends"
+SEGMENT_STATE_SHUTDOWN                      = "Shutdown"
+SEGMENT_STATE_IMMEDIATE_SHUTDOWN            = "ImmediateShutdown"
+
+
+MASTER_REGISTRATION_ORDER = 0
+
+class InvalidSegmentConfiguration(Exception):
+    """Exception raised when an invalid hawqarray configuration is
+    read from gp_segment_configuration or an attempt to save an 
+    invalid hawqarray configuration is made."""
+    def __init__(self, array):
+        self.array = array
+        
+    def __str__(self):
+        return "Invalid HAWQArray: %s" % self.array
+
+# ============================================================================
+# ============================================================================
+class HAWQDB:
+    """
+    HAWQDB class representing configuration information for a single
+    registration_order within a HAWQ cluster.
+    """
+
+    # --------------------------------------------------------------------
+    def __init__(self, registration_order, role, status,
+                 hostname, address, port, datadir):
+
+        self.registration_order = registration_order
+        self.role = role
+        self.status = status
+        self.hostname = hostname
+        self.address = address
+        self.port = port
+        self.datadir = datadir
+        self.catdir = datadir
+
+        # Filespace mappings for a HAWQ DB
+        self.filespaces = None
+
+        # Pending filespace creation
+        self.pending_filespace = None
+
+        # Check if the status is 'u' up, 'd' for down
+        self.valid = (status == 'u')
+        
+    # --------------------------------------------------------------------
+    def __str__(self):
+        """
+        Construct a printable string representation of a HAWQDB
+        """
+        return "%s:%s:registration_order=%s:status=%s" % (
+            self.hostname,
+            self.datadir,
+            self.registration_order,
+            self.status
+            )
+
+    #
+    # Note that this is not an ideal comparison -- it uses the string representation
+    #   for comparison
+    #
+    def __cmp__(self,other):
+        left = repr(self)
+        right = repr(other)
+        if left < right: return -1
+        elif left > right: return 1
+        else: return 0
+
+    def equalIgnoringStatus(self, other):
+        """
+        Return true if none of the "core" attributes (e.g. filespace) 
+          of two segments differ, false otherwise.
+
+        This method is used by updateSystemConfig() to know when a catalog
+        change will cause removing and re-adding a mirror segment.
+        """
+        firstStatus = self.getStatus()
+        try:
+
+            # make the elements we don't want to compare match and see if they are then equal
+            self.setStatus(other.getStatus())
+
+            return self == other
+        finally:
+            # restore mode and status after comaprison 
+            self.setStatus(firstStatus)
+
+    # --------------------------------------------------------------------
+    @staticmethod
+    def getDataDirPrefix(datadir):
+        retValue = ""
+        retValue = datadir[:datadir.rfind('/')]
+        return retValue
+
+    # --------------------------------------------------------------------
+    @staticmethod
+    def getFileSpaceDirsWithNewSuffix(fileSpaceDictionary, suffix, includeSystemFilespace = True):
+        """
+        This method will take the a dictionary of file spaces and return the same dictionary with the new sufix.
+        """
+        retValue = {}
+
+        for entry in fileSpaceDictionary:
+            if entry == SYSTEM_FILESPACE and includeSystemFilespace == False:
+                continue
+            newDir = HAWQDB.getDataDirPrefix(fileSpaceDictionary[entry])
+            newDir = newDir + "/" + suffix
+            retValue[entry] = newDir
+        return retValue
+
+    # --------------------------------------------------------------------
+    def copy(self):
+        """
+        Creates a copy of the segment, shallow for everything except the filespaces map
+
+        """
+        res = copy.copy(self)
+        res.filespaces = copy.copy(self.filespaces)
+        return res
+
+    # --------------------------------------------------------------------
+    # Six simple helper functions to identify what role a segment plays:
+    #  + QD (Query Dispatcher)
+    #     + master
+    #     + standby master
+    #  + QE (Query Executor)
+    #     + primary
+    # --------------------------------------------------------------------    
+    def isMaster(self):
+        return self.role == ROLE_MASTER
+
+    def isStandby(self):
+        return self.role == ROLE_STANDBY
+
+    def isSegment(self):
+        return self.role == ROLE_PRIMARY
+
+    def isUp(self):
+        return self.status == STATUS_UP
+
+    def isDown(self):
+        return self.status == STATUS_DOWN
+
+    # --------------------------------------------------------------------
+    # getters
+    # --------------------------------------------------------------------
+    def getRegistrationOrder(self):
+        return checkNotNone("registration_order", self.registration_order)
+
+    def getRole(self):
+        return checkNotNone("role", self.role)
+
+    def getStatus(self):
+        return checkNotNone("status", self.status)
+
+    def getPort(self):
+        """
+        Returns the listening port for the postmaster for this segment.
+
+        Note: With file replication the postmaster will not be active for
+        mirrors so nothing will be listening on this port, instead the
+        "replicationPort" is used for primary-mirror communication.
+        """
+        return checkNotNone("port", self.port)
+
+    def getHostName(self):
+        """
+        Returns the actual `hostname` for the host
+
+        Note: use getSegmentAddress for the network address to use
+        """
+        return self.hostname
+
+    def getAddress(self):
+        """
+        Returns the network address to use to contact the segment (i.e. the NIC address).
+
+        """
+        return self.address
+
+    def getDataDirectory(self):
+        """
+        Return the primary datadirectory location for the segment.
+
+        Note: the datadirectory is just one of the filespace locations
+        associated with the segment, calling code should be carefull not 
+        to assume that this is the only directory location for this segment.
+
+        Todo: evaluate callers of this function to see if they should really
+        be dealing with a list of filespaces.
+        """
+        return checkNotNone("dataDirectory", self.datadir)
+
+    def getFilespaces(self):
+        """
+        Returns the filespace dictionary of oid->path pairs
+        """
+        return self.filespaces        
+
+
+    # --------------------------------------------------------------------
+    # setters
+    # --------------------------------------------------------------------
+    def setRegistrationOrder(self, registration_order):
+        checkNotNone("registration_order", registration_order)
+        checkIsInt("registration_order", registration_order)
+        self.registration_order = registration_order
+
+    def setRole(self, role):
+        checkNotNone("role", role)
+
+        if role not in VALID_ROLES:
+            raise Exception("Invalid role '%s'" % role)
+
+        self.role = role
+
+    def setStatus(self, status):
+        checkNotNone("status", status)
+
+        if status not in VALID_STATUS:
+            raise Exception("Invalid status '%s'" % status)
+
+        self.status = status
+
+    def setPort(self, port):
+        checkNotNone("port", port)
+        checkIsInt("port", port)
+        self.port = port
+
+    def setHostName(self, hostName):
+        # None is allowed -- don't check
+        self.hostname = hostName
+
+    def setAddress(self, address):
+        # None is allowed -- don't check
+        self.address = address
+
+    def setDataDirectory(self, dataDirectory):
+        checkNotNone("dataDirectory", dataDirectory)
+        self.datadir = dataDirectory
+
+    def addFilespace(self, oid, path):
+        """
+        Add a filespace path for this segment.
+        
+        Throws: 
+           Exception - if a path has already been specified for this segment.
+        """
+
+        # gpfilespace adds a special filespace with oid=None to indicate
+        # the filespace that it is currently building, since the filespace
+        # does not yet exist there is no valid value that could be used.
+        if oid == None:
+            if self.pending_filespace:
+                raise Exception("Duplicate filespace path for registration_order %d" % 
+                                self.registration_order)
+            self.pending_filespace = path
+            return
+        
+        # oids should always be integer values > 0
+        oid = int(oid)
+        assert(oid > 0)
+
+        # The more usual case just sets the filespace in the filespace 
+        # dictionary
+        if oid in self.filespaces:
+            raise Exception("Duplicate filespace path for "
+                            "registration_order %d filespace %d" % (self.registration_order, oid))
+        self.filespaces[oid] = path
+
+    def getPendingFilespace(self):
+        """
+        Returns the pending filespace location for this segment
+        (called by gpfilespace)
+        """
+        return self.pending_filespace
+
+
+class HAWQFilesystemObj:
+    """
+    List information for a filesystem, as stored in pg_filesystem
+    """
+    def __init__(self, oid, name, shared):
+        self.__oid = oid
+        self.__name = name
+        self.__shared = shared
+
+    def getOid(self):
+        return self.__oid
+
+    def getName(self):
+        return self.__name
+
+    def isShared(self):
+        return self.__shared == True
+
+    @staticmethod
+    def getFilesystemObj(filesystemArr, fsoid):
+        # local storage
+        if fsoid == 0:
+            return None
+        # plugin storage
+        for fsys in filesystemArr:
+            if (fsys.getOid() == fsoid):
+                return fsys
+        raise Exception("Error: invalid file system oid %d" % (fsoid))
+
+class HAWQFilespaceObj:
+    """
+    List information for a filespace, as stored in pg_filespace
+    """
+    def __init__(self, oid, name, fsys):
+        self.__oid = oid
+        self.__name = name
+        self.__fsys = fsys
+
+    def getOid(self):
+        return self.__oid
+
+    def getName(self):
+        return self.__name
+
+    def getFsys(self):
+        return self.__fsys
+
+    def isSystemFilespace(self):
+        return self.__oid == SYSTEM_FILESPACE
+
+
+
+class HAWQArray:
+    """ 
+    HAWQArray is a python class that describes a HAWQ array.
+
+    A HAWQ array consists of:
+      master         - The primary QD for the array
+      standby master - The mirror QD for the array [optional]
+      segment array  - an array of segments within the cluster
+
+    Each segment is either a single HAWQDB object, or a primary/mirror pair.
+
+    It can be initialized either from a database connection, in which case
+    it discovers the configuration information by examining the catalog, or
+    via a configuration file.
+    """
+
+    # --------------------------------------------------------------------
+    def __init__(self, hawqdbs):
+        """
+        segmentsInDb is used only be the configurationImpl* providers; it is used to track the state of the
+          segments in the database
+
+        TODO:
+
+        """
+
+        self.master = None
+        self.standbyMaster = None
+        self.segments = []
+        self.numSegments = 0
+        self.version = None
+
+        self.setFilespaces([])
+
+        for hdb in hawqdbs:
+
+            # Handle master
+            if hdb.isMaster():
+                if self.master != None:
+                    logger.error("multiple master dbs defined")
+                    raise Exception("HAWQArray - multiple master dbs defined")
+                self.master = hdb
+
+            # Handle standby
+            elif hdb.isStandby():
+                if self.standbyMaster != None:
+                    logger.error("multiple standby master dbs defined")
+                    raise Exception("HAWQArray - multiple standby master dbs defined")
+                self.standbyMaster = hdb
+
+            # Handle segments
+            elif hdb.isSegment():
+                self.addSegment(hdb)
+
+            else:
+                # Not a master, standbymaster, primary, or mirror?
+                # shouldn't even be possible.
+                logger.error("FATAL - invalid dbs defined")
+                raise Exception("Error: HAWQArray() - invalid dbs defined")
+
+        # Make sure HAWQ cluster has a master
+        if self.master is None:
+            logger.error("FATAL - no master defined!")
+            raise Exception("Error: HAWQArray() - no master defined")
+
+    def __str__(self):
+        return "Master: %s\nStandby: %s\nSegments: %s" % (str(self.master),
+                                                          str(self.standbyMaster) if self.standbyMaster else 'Not Configured',
+                                                          "\n".join([str(seg) for seg in self.segments]))
+
+    def addSegment(self, hdb):
+        if hdb.isSegment():
+            self.segments.append(hdb)
+            self.numSegments += 1
+        else:
+            raise Exception("Error: adding invalid segment to HAWQArray")
+
+
+    # --------------------------------------------------------------------
+    @staticmethod
+    def initFromCatalog(dbURL, utility=False, useAllSegmentFileSpaces=False):
+        """
+        Factory method, initializes a HAWQArray from provided database URL
+
+        Please note that -
+        useAllSegmentFilespaces when set to true makes this method add *all* filespaces
+        to the segments of hawqarray. If false, only returns Master/Standby all filespaces
+        This is *hacky* and we know that it is not the right way to design methods/interfaces
+        We are doing this so that we do not affect behavior of existing tools like upgrade, gprecoverseg etc
+        """
+
+        conn = dbconn.connect(dbURL, utility)
+
+        # Get the version from the database:
+        version_str = None
+        for row in dbconn.execSQL(conn, "SELECT version()"):
+            version_str = row[0]
+        version = GpVersion(version_str)
+
+        # Only for HAWQ 2.0
+        if version.getVersionRelease() in ("2.0"):
+
+            hawq_site = HawqXMLParser(GPHOME)
+            master_data_directory  = hawq_site.get_value_from_name('hawq_master_directory')
+            segment_data_directory = hawq_site.get_value_from_name('hawq_segment_directory')
+
+            # strategy_rows = dbconn.execSQL(conn, "show gp_fault_action")
+            strategy_rows = []
+
+            config_rows = dbconn.execSQL(conn, '''
+                               SELECT sc.registration_order,
+                                      sc.role,
+                                      sc.status,
+                                      sc.hostname,
+                                      sc.address,
+                                      sc.port,
+                                      fs.oid,
+                                      CASE
+                                          WHEN sc.registration_order <= 0 THEN '%s'
+                                          ELSE '%s'
+                                      END AS datadir
+                               FROM pg_catalog.gp_segment_configuration sc,
+                                    pg_catalog.pg_filespace fs,
+                                    pg_catalog.pg_filespace_entry fse
+                               WHERE fse.fsefsoid = fs.oid
+                               ORDER BY sc.registration_order;''' %
+                               (master_data_directory, segment_data_directory))
+
+            # All of filesystem is shared storage
+            filesystemRows = dbconn.execSQL(conn, '''
+                SELECT oid, fsysname, true AS fsysshared
+                FROM pg_filesystem
+                ORDER BY fsysname
+            ''')
+
+            filesystemArr = [HAWQFilesystemObj(fsysRow[0], fsysRow[1], fsysRow[2]) for fsysRow in filesystemRows]
+
+            filespaceRows = dbconn.execSQL(conn, '''
+                SELECT oid, fsname, fsfsys AS fsoid
+                FROM pg_filespace
+                WHERE oid != %d
+                ORDER BY fsname;
+            ''' % (SYSTEM_FILESPACE))
+
+            filespaceArr = [HAWQFilespaceObj(fsRow[0], fsRow[1], HAWQFilesystemObj.getFilesystemObj(filesystemArr, fsRow[2])) for fsRow in filespaceRows]
+
+        else:
+            raise Exception("HAWQ version is invalid: %s" % version)
+
+        hawqdbs = []
+        print "### initFromCatalog ###"
+        hdb = None
+        for row in config_rows:
+
+            print row
+
+            # Extract fields from the row
+            (registration_order, role, status, hostname, 
+             address, port, fsoid, datadir) = row
+
+            # In GPSQL, only master maintain the filespace information.
+            # if registration_order != MASTER_REGISTRATION_ORDER and \
+            #    fsoid != SYSTEM_FILESPACE and \
+            #    not useAllSegmentFileSpaces:
+            #    print "### initFromCatalog ... continue ###"
+            #    continue
+
+            # The query returns all the filespaces for a segment on separate
+            # rows.  If this row is the same dbid as the previous row simply
+            # add this filespace to the existing list, otherwise create a
+            # new segment.
+            # if seg and seg.getSegmentRegistrationOrder() == registration_order:
+            #     seg.addSegmentFilespace(fsoid, fslocation)
+            # else:
+            #     seg = HAWQDB(registration_order, role, status, 
+            #                 hostname, address, port, datadir)
+            #    segments.append(seg)
+
+            hdb = HAWQDB(registration_order, role, status, 
+                         hostname, address, port, datadir)
+            print "### initFromCatalog ... hdb ###"
+            print hdb
+            hawqdbs.append(hdb)
+            print "### initFromCatalog ... hawqdbs ###"
+            print hawqdbs
+        
+        conn.close()
+        
+        # origSegments = [seg.copy() for seg in segments]
+        
+        array = HAWQArray(hawqdbs)
+        array.version = version
+        array.setFilespaces(filespaceArr)
+        array.setFilesystem(filesystemArr)
+        
+        return array
+
+    # --------------------------------------------------------------------
+    def is_array_valid(self):
+        """Checks that each array is in a valid state"""
+
+        if self.master.getStatus() != STATUS_UP:
+            return False
+
+        if self.standbyMaster and self.standbyMaster.getStatus() != STATUS_UP:
+            return False
+
+        for seg in self.segments:
+            if not seg.status == STATUS_UP:
+                return False
+        return True
+
+    # --------------------------------------------------------------------
+    def setFilesystem(self, filesystemArr):
+        """
+        @param filesystemArr of GpFilesystemObj objects
+        """
+        self.filesystemArr = [fsys for fsys in filesystemArr]
+
+    def getFilesystem(self):
+        """
+        @return a newly allocated list of GpFilespaceObj objects, will have been sorted by filesystem name
+        """
+        return [fsys for fsys in self.filesystemArr]
+
+    def setFilespaces(self, filespaceArr):
+        """
+        @param filespaceArr of GpFilespaceObj objects
+        """
+        self.filespaceArr = [fs for fs in filespaceArr]
+
+    def getFilespaces(self, includeSystemFilespace=True):
+        """
+        @return a newly allocated list of GpFilespaceObj objects, will have been sorted by filespace name
+        """
+        return [fs for fs in self.filespaceArr if fs.isSystemFilespace()]
+
+    def getNonSystemFilespaces(self):
+        """
+        @return a newly allocated list of GpFilespaceObj objects, will have been sorted by filespace name
+        """
+        return [fs for fs in self.filespaceArr if not fs.isSystemFilespace()]
+
+    def getAllFilespaces(self):
+        """
+        @return a newly allocated list of GpFilespaceObj objects, will have been sorted by filespace name
+        """
+        return [fs for fs in self.filespaceArr]
+
+    # --------------------------------------------------------------
+    def getFileSpaceName(self, filespaceOid):
+        retValue = None
+        
+        if self.filespaceArr != None:
+            for entry in self.filespaceArr:
+                if entry.getOid() == filespaceOid:
+                    retValue = entry.getName()
+                    break
+        return retValue
+
+    # --------------------------------------------------------------
+    def getFileSpaceOid(self, filespaceName):
+        retValue = None
+        
+        if self.filespaceArr != None:
+            for entry in self.filespaceArr:
+                if entry.getName() == filespaceName:
+                    retValue = entry.getOid()
+                    break
+        return retValue
+
+    # --------------------------------------------------------------
+    def isFileSpaceShared(self, filespaceOid):
+        retValue = False
+        
+        if self.filespaceArr != None:
+            for entry in self.filespaceArr:
+                if entry.getOid() == filespaceOid:
+                    retValue = entry.getFsys() != None and entry.getFsys().isShared()
+                    break
+        return retValue
+
+
+    # --------------------------------------------------------------------
+    def getDbList(self):
+        """
+        Return a list of all HAWQDB objects that make up the array
+        """
+        dbs=[]
+
+        dbs.append(self.master)
+
+        if self.standbyMaster:
+            dbs.append(self.standbyMaster)
+
+        dbs.extend(self.getSegDbList())
+
+        return dbs
+
+    # --------------------------------------------------------------------
+    def getHostList(self, includeExpansionSegs = False):
+        """
+        Return a list of all Hosts that make up the array
+        """
+        hostList = []
+
+        hostList.append(self.master.getSegmentHostName())
+
+        if self.standbyMaster:
+            hostList.append(self.standbyMaster.getSegmentHostName())
+            
+        dbList = self.getDbList()
+        for db in dbList:
+            if db.getSegmentHostName() in hostList:
+                continue
+            else:
+                hostList.append(db.getSegmentHostName())
+
+        return hostList
+
+    def getSegDbList(self):
+        """Return a list of all HAWQDB objects for all segments in the array"""
+        dbs=[]
+
+        for seg in self.segments:
+            dbs.append(seg)
+
+        return dbs
+

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/8cdae414/tools/bin/hawqpylib/mainUtils.py
----------------------------------------------------------------------
diff --git a/tools/bin/hawqpylib/mainUtils.py b/tools/bin/hawqpylib/mainUtils.py
new file mode 100644
index 0000000..d0148e9
--- /dev/null
+++ b/tools/bin/hawqpylib/mainUtils.py
@@ -0,0 +1,655 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+# 
+#   http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+# Line too long - pylint: disable=C0301
+# Invalid name  - pylint: disable=C0103
+
+"""
+mainUtils.py
+------------
+
+This file provides a rudimentary framework to support top-level option
+parsing, initialization and cleanup logic common to multiple programs.
+
+It also implements workarounds to make other modules we use like
+GpCoverage() work properly.
+
+The primary interface function is 'simple_main'.  For an example of
+how it is expected to be used, see gprecoverseg.
+
+It is anticipated that the functionality of this file will grow as we
+extend common functions of our gp utilities.  Please keep this in mind
+and try to avoid placing logic for a specific utility here.
+"""
+
+import os, sys, signal, errno, yaml
+
+gProgramName = os.path.split(sys.argv[0])[-1]
+if sys.version_info < (2, 5, 0):
+    sys.exit(
+'''Error: %s is supported on Python versions 2.5 or greater
+Please upgrade python installed on this machine.''' % gProgramName)
+
+from gppylib import gplog
+from gppylib.commands import gp, unix
+from gppylib.commands.base import ExecutionError
+from gppylib.system import configurationInterface, fileSystemInterface, fileSystemImplOs
+from gppylib.system import osInterface, osImplNative, faultProberInterface, faultProberImplGpdb
+from hawqpylib.system import configurationImplHAWQ
+from optparse import OptionGroup, OptionParser, SUPPRESS_HELP
+from gppylib.gpcoverage import GpCoverage
+from lockfile.pidlockfile import PIDLockFile, LockTimeout
+
+
+def getProgramName():
+    """
+    Return the name of the current top-level program from sys.argv[0]
+    or the programNameOverride option passed to simple_main via mainOptions.
+    """
+    global gProgramName
+    return gProgramName
+
+
+class SimpleMainLock:
+    """
+    Tools like gprecoverseg prohibit running multiple instances at the same time
+    via a simple lock file created in the MASTER_DATA_DIRECTORY.  This class takes
+    care of the work to manage this lock as appropriate based on the mainOptions
+    specified.
+
+    Note that in some cases, the utility may want to recursively invoke
+    itself (e.g. gprecoverseg -r).  To handle this, the caller may specify
+    the name of an environment variable holding the pid already acquired by
+    the parent process.
+    """
+    def __init__(self, mainOptions):
+        self.pidfilename   = mainOptions.get('pidfilename', None)       # the file we're using for locking
+        self.parentpidvar  = mainOptions.get('parentpidvar', None)      # environment variable holding parent pid
+        self.parentpid     = None                                       # parent pid which already has the lock
+        self.ppath         = None                                       # complete path to the lock file
+        self.pidlockfile   = None                                       # PIDLockFile object
+        self.pidfilepid    = None                                       # pid of the process which has the lock
+        self.locktorelease = None                                       # PIDLockFile object we should release when done
+
+        if self.parentpidvar is not None and self.parentpidvar in os.environ:
+            self.parentpid = int(os.environ[self.parentpidvar])
+
+        if self.pidfilename is not None:
+            self.ppath       = os.path.join(gp.get_masterdatadir(), self.pidfilename)
+            self.pidlockfile = PIDLockFile( self.ppath )
+
+
+    def acquire(self):
+        """
+        Attempts to acquire the lock this process needs to proceed.
+
+        Returns None on successful acquisition of the lock or 
+          the pid of the other process which already has the lock.
+        """
+        # nothing to do if utiliity requires no locking
+        if self.pidlockfile is None:
+            return None
+
+        # look for a lock file
+        self.pidfilepid = self.pidlockfile.read_pid()
+        if self.pidfilepid is not None:
+
+            # we found a lock file
+            # allow the process to proceed if the locker was our parent
+            if self.pidfilepid == self.parentpid:
+                return None
+
+            # cleanup stale locks
+            try:
+                os.kill(self.pidfilepid, signal.SIG_DFL)
+            except OSError, exc:
+                if exc.errno == errno.ESRCH:
+                    self.pidlockfile.break_lock()
+                    self.pidfilepid = None
+
+        # try and acquire the lock
+        try:
+            self.pidlockfile.acquire(1)
+
+        except LockTimeout:
+            self.pidfilepid = self.pidlockfile.read_pid()
+            return self.pidfilepid
+
+        # we have the lock
+        # prepare for a later call to release() and take good
+        # care of the process environment for the sake of our children
+        self.locktorelease = self.pidlockfile
+        self.pidfilepid    = self.pidlockfile.read_pid()
+        if self.parentpidvar is not None:
+            os.environ[self.parentpidvar] = str(self.pidfilepid)
+
+        return None
+
+
+    def release(self):
+        """
+        Releases the lock this process acquired.
+        """
+        if self.locktorelease is not None:
+            self.locktorelease.release()
+            self.locktorelease = None
+
+
+
+#
+# exceptions we handle specially by the simple_main framework.
+#
+
+class ProgramArgumentValidationException(Exception):
+    """
+    Throw this out to main to have the message possibly
+    printed with a help suggestion.
+    """
+    def __init__(self, msg, shouldPrintHelp=False):
+        "init"
+        Exception.__init__(self)
+        self.__shouldPrintHelp = shouldPrintHelp
+        self.__msg = msg
+
+    def shouldPrintHelp(self): 
+        "shouldPrintHelp"
+        return self.__shouldPrintHelp
+
+    def getMessage(self): 
+        "getMessage"
+        return self.__msg
+
+
+class ExceptionNoStackTraceNeeded(Exception):
+    """
+    Our code throws this exception when we encounter a condition
+    we know can arise which demands immediate termination.
+    """
+    pass
+
+
+class UserAbortedException(Exception):
+    """
+    UserAbortedException should be thrown when a user decides to stop the 
+    program (at a y/n prompt, for example).
+    """
+    pass
+
+
+def simple_main( createOptionParserFn, createCommandFn, mainOptions=None) :
+    """
+     createOptionParserFn : a function that takes no arguments and returns an OptParser
+     createCommandFn : a function that takes two argument (the options and the args (those that are not processed into
+                       options) and returns an object that has "run" and "cleanup" functions.  Its "run" function must
+                       run and return an exit code.  "cleanup" will be called to clean up before the program exits;
+                       this can be used to clean up, for example, to clean up a worker pool
+
+     mainOptions can include: forceQuietOutput (map to bool),
+                              programNameOverride (map to string)
+                              suppressStartupLogMessage (map to bool)
+                              useHelperToolLogging (map to bool)
+                              setNonuserOnToolLogger (map to bool, defaults to false)
+                              pidfilename (string)
+                              parentpidvar (string)
+
+    """
+    coverage = GpCoverage()
+    coverage.start()
+    try:
+        simple_main_internal(createOptionParserFn, createCommandFn, mainOptions)
+    finally:
+        coverage.stop()
+        coverage.generate_report()
+
+
+def simple_main_internal(createOptionParserFn, createCommandFn, mainOptions):
+    """
+    If caller specifies 'pidfilename' in mainOptions then we manage the
+    specified pid file within the MASTER_DATA_DIRECTORY before proceeding
+    to execute the specified program and we clean up the pid file when
+    we're done.
+    """
+    sml = None
+    if mainOptions is not None and 'pidfilename' in mainOptions:
+        sml      = SimpleMainLock(mainOptions)
+        otherpid = sml.acquire()
+        if otherpid is not None:
+            logger = gplog.get_default_logger()
+            logger.error("An instance of %s is already running (pid %s)" % (getProgramName(), otherpid))
+            return
+
+    # at this point we have whatever lock we require
+    try:
+        simple_main_locked(createOptionParserFn, createCommandFn, mainOptions)
+    finally:
+        if sml is not None:
+            sml.release()
+
+
+def simple_main_locked(createOptionParserFn, createCommandFn, mainOptions):
+    """
+    Not to be called externally -- use simple_main instead
+    """
+    logger = gplog.get_default_logger()
+
+    configurationInterface.registerConfigurationProvider( configurationImplHAWQ.GpConfigurationProviderUsingHAWQCatalog())
+    fileSystemInterface.registerFileSystemProvider( fileSystemImplOs.GpFileSystemProviderUsingOs())
+    osInterface.registerOsProvider( osImplNative.GpOsProviderUsingNative())
+    faultProberInterface.registerFaultProber( faultProberImplGpdb.GpFaultProberImplGpdb())
+
+    commandObject = None
+    parser = None
+
+    forceQuiet = mainOptions is not None and mainOptions.get("forceQuietOutput")
+    options = None
+
+    if mainOptions is not None and mainOptions.get("programNameOverride"):
+        global gProgramName
+        gProgramName = mainOptions.get("programNameOverride")
+    suppressStartupLogMessage = mainOptions is not None and mainOptions.get("suppressStartupLogMessage")
+
+    useHelperToolLogging = mainOptions is not None and mainOptions.get("useHelperToolLogging")
+    nonuser = True if mainOptions is not None and mainOptions.get("setNonuserOnToolLogger") else False
+
+    # NOTE: if this logic is changed then also change test_main in testUtils.py
+    try:
+        execname = getProgramName()
+        hostname = unix.getLocalHostname()
+        username = unix.getUserName()
+
+        parser = createOptionParserFn()
+        (options, args) = parser.parse_args()
+
+        if useHelperToolLogging:
+            gplog.setup_helper_tool_logging(execname, hostname, username)
+        else:
+            gplog.setup_tool_logging(execname, hostname, username,
+                                        logdir=options.ensure_value("logfileDirectory", None), nonuser=nonuser )
+
+        if forceQuiet:
+            gplog.quiet_stdout_logging()
+        else:
+            if options.ensure_value("verbose", False):
+                gplog.enable_verbose_logging()
+            if options.ensure_value("quiet", False):
+                gplog.quiet_stdout_logging()
+
+        if options.ensure_value("masterDataDirectory", None) is not None:
+            options.master_data_directory = os.path.abspath(options.masterDataDirectory)
+
+        if not suppressStartupLogMessage:
+            logger.info("Starting %s with args: %s" % (gProgramName, ' '.join(sys.argv[1:])))
+
+        commandObject = createCommandFn(options, args)
+        exitCode = commandObject.run()
+        sys.exit(exitCode)
+
+    except ProgramArgumentValidationException, e:
+        if e.shouldPrintHelp():
+            parser.print_help()
+        logger.error("%s: error: %s" %(gProgramName, e.getMessage()))
+        sys.exit(2)
+    except ExceptionNoStackTraceNeeded, e:
+        logger.error( "%s error: %s" % (gProgramName, e))
+        sys.exit(2)
+    except UserAbortedException, e:
+        logger.info("User abort requested, Exiting...")
+        sys.exit(4)
+    except ExecutionError, e:
+        logger.fatal("Error occurred: %s\n Command was: '%s'\n"
+                     "rc=%d, stdout='%s', stderr='%s'" %\
+                     (e.summary,e.cmd.cmdStr, e.cmd.results.rc, e.cmd.results.stdout,
+                     e.cmd.results.stderr ))
+        sys.exit(2)
+    except Exception, e:
+        if options is None:
+            logger.exception("%s failed.  exiting...", gProgramName)
+        else:
+            if options.ensure_value("verbose", False):
+                logger.exception("%s failed.  exiting...", gProgramName)
+            else:
+                logger.fatal("%s failed. (Reason='%s') exiting..." % (gProgramName, e))
+        sys.exit(2)
+    except KeyboardInterrupt:
+        sys.exit('\nUser Interrupted')
+    finally:
+        if commandObject:
+            commandObject.cleanup()
+
+
+def addStandardLoggingAndHelpOptions(parser, includeNonInteractiveOption, includeUsageOption=False):
+    """
+    Add the standard options for help and logging
+    to the specified parser object.
+    """
+    parser.set_usage('%prog [--help] [options] ')
+    parser.remove_option('-h')
+
+    addTo = parser
+    addTo.add_option('-h', '-?', '--help', action='help',
+                      help='show this help message and exit')
+    if includeUsageOption:
+        parser.add_option('--usage', action="briefhelp")
+
+    addTo = OptionGroup(parser, "Logging Options")
+    parser.add_option_group(addTo)
+    addTo.add_option('-v', '--verbose', action='store_true', 
+                      help='debug output.')
+    addTo.add_option('-q', '--quiet', action='store_true',
+                      help='suppress status messages')
+    addTo.add_option("-l", None, dest="logfileDirectory", metavar="<directory>", type="string",
+                  help="Logfile directory")
+
+    if includeNonInteractiveOption:
+        addTo.add_option('-a', dest="interactive" , action='store_false', default=True,
+                    help="quiet mode, do not require user input for confirmations")
+
+
+def addMasterDirectoryOptionForSingleClusterProgram(addTo):
+    """
+    Add the -d master directory option to the specified parser object
+    which is intended to provide the value of the master data directory.
+
+    For programs that operate on multiple clusters at once, this function/option
+    is not appropriate.
+    """
+    addTo.add_option('-d', '--master_data_directory', type='string',
+                        dest="masterDataDirectory",
+                        metavar="<master data directory>",
+                        help="Optional. The master host data directory. If not specified, the value set"\
+                            "for $MASTER_DATA_DIRECTORY will be used.")
+    
+
+
+#
+# YamlMain
+# 
+
+def get_yaml(targetclass):
+    "get_yaml"
+
+    # doc    - class's doc string
+    # pos    - where YAML starts in doc
+    # ystr   - YAML string extracted from doc
+
+    if not hasattr(targetclass, '_yaml') or targetclass._yaml is None:
+        doc = targetclass.__doc__
+        pos = doc.find('%YAML')
+        assert pos >= 0, "targetclass doc string is missing %YAML plan"
+        ystr  = doc[pos:].replace('\n    ','\n')
+        targetclass._yaml = yaml.load(ystr)
+    return targetclass._yaml
+
+
+class YamlMain:
+    "YamlMain"
+
+    def __init__(self):
+        "Parse arguments based on yaml docstring"
+        self.current       = None
+        self.plan          = None
+        self.scenario_name = None
+        self.logger        = None
+        self.logfilename   = None
+        self.errmsg        = None
+
+        self.parser = YamlOptions(self).parser
+        self.options, self.args = self.parser.parse_args()
+        self.options.quiet = self.options.q
+        self.options.verbose = self.options.v
+
+
+    #
+    # simple_main interface
+    #
+    def __call__(self, *args):
+        "Allows us to use self as the create_parser and create_program functions in call to simple_main"
+        return self
+
+    def parse_args(self):
+        "Called by simple_main to obtain results from parser returned by create_parser"
+        return self.options, self.args
+
+    def run(self):
+        "Called by simple_main to execute the program returned by create_program"
+        self.plan = Plan(self)
+        self.scenario_name = self.plan.name
+        self.logger        = self.plan.logger
+        self.logfilename   = self.plan.logfilename
+        self.errmsg        = self.plan.errmsg
+        self.current       = []
+        self.plan.run()
+
+    def cleanup(self):
+        "Called by simple_main to cleanup after program returned by create_program finishes"
+        pass
+
+    def simple(self):
+        "Delegates setup and control to mainUtils.simple_main"
+        simple_main(self, self)
+
+
+#
+# option parsing
+#
+
+class YamlOptions:
+    "YamlOptions"
+
+    def __init__(self, target):
+        """
+        Scan the class doc string of the given object, looking for the %YAML
+        containing the option specification.  Parse the YAML and setup the
+        corresponding OptionParser object.
+        """
+        # target - options object (input)
+        # gname  - option group name
+
+        self.y      = get_yaml(target.__class__)
+        self.parser = OptionParser( description=self.y['Description'], version='%prog version $Revision$')
+        self.parser.remove_option('-h')
+        self.parser.set_usage(self.y['Usage'])
+        self.opty   = self.y['Options']
+        for gname in self.opty.get('Groups', []):
+            self._register_group(gname)
+            
+
+    def _register_group(self, gname):
+        """
+        Register options for the specified option group name to the OptionParser
+        using an OptionGroup unless the group name starts with 'Help' in which
+        case we just register the options with the top level OptionParser object.
+        """
+        # gname    - option group name (input)
+        # gy       - option group YAML object
+        # grp      - option group object
+        # tgt      - where to add options (parser or option group)
+        # optkey   - comma separated list of option flags
+        # optval   - help string or dict with detailed option settings
+        # listargs - list of option flags (e.g. ['-h', '--help'])
+        # dictargs - key/value arguments to add_option
+
+        gy = self.opty.get(gname, None)
+        if gname.startswith('Help'): 
+            grp = None
+            tgt = self.parser
+        else:
+            grp = OptionGroup(self.parser, gname)
+            tgt = grp
+
+        for optkey, optval in gy.items():
+            listargs = optkey.split(',')
+            if type(optval) == type(''):
+                # short form: optval is just a help string
+                dictargs = {
+                    'action': 'store_true',
+                    'help':   optval
+                }
+            else:
+                # optval is the complete option specification
+                dictargs = optval
+
+            # hide hidden options
+            if dictargs.get('help','').startswith('hidden'):
+                dictargs['help'] = SUPPRESS_HELP
+
+            #print 'adding', listargs, dictargs
+            tgt.add_option(*listargs, **dictargs)
+
+        if grp is not None:
+            self.parser.add_option_group(grp)
+
+
+
+#
+# plan execution
+#
+
+class Task:
+    "Task"
+
+    def __init__(self, key, name, subtasks=None):
+        self.Key      = key    		# task key
+        self.Name     = name    	# task name
+        self.SubTasks = subtasks	# subtasks, if any
+        self.Func     = None            # task function, set by _task
+
+
+    def _print(self, main, prefix):
+        print '%s %s %s:' % (prefix, self.Key, self.Name)
+
+    def _debug(self, main, prefix):
+        main.logger.debug('Execution Plan:%s %s %s%s' % (prefix, self.Key, self.Name, ':' if self.SubTasks else ''))
+
+    def _run(self, main, prefix):
+        main.logger.debug(' Now Executing:%s %s %s' % (prefix, self.Key, self.Name))
+        if self.Func:
+            self.Func()
+        
+
+class Exit(Exception):
+    def __init__(self, rc, code=None, call_support=False):
+        Exception.__init__(self)
+        self.code         = code
+        self.prm          = sys._getframe(1).f_locals
+        self.rc           = rc
+        self.call_support = call_support
+
+
+class Plan:
+    "Plan"
+
+    def __init__(self, main):
+        """
+        Create cached yaml from class doc string of the given object, 
+        looking for the %YAML indicating the beginning of the object's YAML plan and parse it.
+        Build the plan stages and tasks for the specified scenario.
+        """
+        # main - object with yaml scenarios (input)
+        # sy   - Stage yaml
+
+        self.logger      = gplog.get_default_logger()
+        self.logfilename = gplog.get_logfile()
+
+        self.main        = main
+        self.y           = get_yaml(main.__class__)
+        self.name        = main.options.scenario
+        if not self.name:
+            self.name    = self.y['Default Scenario']
+        self.scenario    = self.y['Scenarios'][self.name]
+        self.errors      = self.y['Errors']
+        self.Tasks       = [ self._task(ty) for ty in self.scenario ]
+
+
+    def _task(self, ty):
+        "Invoked by __init__ to build a top-level task from the YAML"
+
+        # ty   - Task yaml (input)
+        # tyk  - Task yaml key
+        # tyv  - Task yaml value
+        # sty  - Sub Task yaml
+        # t    - Task (returned)
+
+        for tyk, tyv in ty.items():
+            key, workers = tyk.split(None, 1)
+            subtasks = [ self._subtask(sty) for sty in tyv ]
+            t = Task(key, workers, subtasks)
+            return t
+
+    def _subtask(self, sty):
+        "Invoked by _stage to build a task from the YAML"
+
+        # sty  - Sub Task yaml (input)
+        # st   - Sub Task (returned)
+
+        key, rest = sty.split(None, 1)
+        st = Task(key, rest)
+        fn = st.Name.lower().replace(' ','_')
+        try:
+            st.Func = getattr(self.main, fn)
+        except AttributeError, e:
+            raise Exception("Failed to lookup '%s' for sub task '%s': %s" % (fn, st.Name, str(e)))
+        return st
+
+
+
+    def _dotasks(self, subtasks, prefix, action):
+        "Apply an action to each subtask recursively"
+
+        # st   - Sub Task
+
+        for st in subtasks or []:
+            self.main.current.append(st)
+            action(st, self.main, prefix)
+            self._dotasks(st.SubTasks, '  '+prefix, action)
+            self.main.current.pop()
+
+
+    def _print(self):
+        "Print in YAML form."
+
+        print '%s:' % self.name
+        self._dotasks(self.Tasks, ' -', lambda t,m,p:t._print(m,p))
+
+
+    def run(self):
+        "Run the stages and tasks."
+
+        self.logger.debug('Execution Plan: %s' % self.name)
+        self._dotasks(self.Tasks, ' -', lambda t,m,p:t._debug(m,p))
+                
+        self.logger.debug(' Now Executing: %s' % self.name)
+        try:
+            self._dotasks(self.Tasks, ' -', lambda t,m,p:t._run(m,p))
+        except Exit, e:
+            self.exit(e.code, e.prm, e.rc, e.call_support)
+
+
+    def errmsg(self, code, prm={}):
+        "Return a formatted error message"
+        return self.errors[code] % prm
+        
+
+    def exit(self, code=None, prm={}, rc=1, call_support=False):
+        "Terminate the application"
+        if code:
+            msg = self.errmsg(code, prm)
+            self.logger.error(msg)
+        if call_support:
+            self.logger.error('Please send %s to Greenplum support.' % self.logfilename)
+        self.logger.debug('exiting with status %(rc)s' % locals())
+        sys.exit(rc)
+

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/8cdae414/tools/bin/hawqpylib/programs/__init__.py
----------------------------------------------------------------------
diff --git a/tools/bin/hawqpylib/programs/__init__.py b/tools/bin/hawqpylib/programs/__init__.py
new file mode 100644
index 0000000..e69de29


Mime
View raw message