hawq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From lil...@apache.org
Subject [1/2] incubator-hawq git commit: HAWQ-275. Add connection check when dispatcher wants to reuse the QE forked by former queries, and when dispather dispatches the detailed query command to QE
Date Mon, 22 Feb 2016 01:17:09 GMT
Repository: incubator-hawq
Updated Branches:
  refs/heads/master 27b12112f -> e6cabe821


HAWQ-275. Add connection check when dispatcher wants to reuse the QE forked by former queries,
and when dispather dispatches the detailed query command to QE


Project: http://git-wip-us.apache.org/repos/asf/incubator-hawq/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-hawq/commit/ca7afdce
Tree: http://git-wip-us.apache.org/repos/asf/incubator-hawq/tree/ca7afdce
Diff: http://git-wip-us.apache.org/repos/asf/incubator-hawq/diff/ca7afdce

Branch: refs/heads/master
Commit: ca7afdce5481fb1aaee90bd759eaffb022d9a703
Parents: 629ecd7
Author: Lili Ma <lma@pivotal.io>
Authored: Mon Feb 22 09:10:55 2016 +0800
Committer: Lili Ma <lma@pivotal.io>
Committed: Mon Feb 22 09:10:55 2016 +0800

----------------------------------------------------------------------
 src/backend/cdb/dispatcher.c    | 37 ++++++++++++++++++++++++++++++
 src/backend/cdb/executormgr.c   | 44 ++++++++++++++++++++++++------------
 src/backend/cdb/motion/ic_udp.c | 43 +----------------------------------
 src/include/cdb/dispatcher.h    |  2 ++
 4 files changed, 69 insertions(+), 57 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/ca7afdce/src/backend/cdb/dispatcher.c
----------------------------------------------------------------------
diff --git a/src/backend/cdb/dispatcher.c b/src/backend/cdb/dispatcher.c
index f72aa35..57ccdf4 100644
--- a/src/backend/cdb/dispatcher.c
+++ b/src/backend/cdb/dispatcher.c
@@ -1803,3 +1803,40 @@ dispatcher_print_statistics(StringInfo buf, DispatchData *data)
 			INSTR_TIME_GET_MILLISEC(data->time_total_free) / data->num_of_dispatched);
 }
 
+
+
+/*
+ * Check the connection from the dispatcher to verify that it is still there.
+ * Return true if the dispatcher connection is still alive.
+ */
+bool dispatch_validate_conn(pgsocket sock)
+{
+  ssize_t   ret;
+  char    buf;
+
+  if (sock < 0)
+    return false;
+
+#ifndef WIN32
+    ret = recv(sock, &buf, 1, MSG_PEEK|MSG_DONTWAIT);
+#else
+    ret = recv(sock, &buf, 1, MSG_PEEK|MSG_PARTIAL);
+#endif
+
+  if (ret == 0) /* socket has been closed. EOF */
+    return false;
+
+  if (ret > 0) /* data waiting on socket, it must be OK. */
+    return true;
+
+  if (ret == -1) /* error, or would be block. */
+  {
+    if (errno == EAGAIN || errno == EINPROGRESS || errno == EWOULDBLOCK)
+      return true; /* connection intact, no data available */
+    else
+      return false;
+  }
+  /* not reached */
+
+  return true;
+}

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/ca7afdce/src/backend/cdb/executormgr.c
----------------------------------------------------------------------
diff --git a/src/backend/cdb/executormgr.c b/src/backend/cdb/executormgr.c
index 6216d23..4d1351d 100644
--- a/src/backend/cdb/executormgr.c
+++ b/src/backend/cdb/executormgr.c
@@ -361,6 +361,14 @@ executormgr_cancel(QueryExecutor *executor)
 	return success;
 }
 
+static bool
+executormgr_validate_conn(PGconn *conn)
+{
+  if (conn == NULL)
+    return false;
+  return dispatch_validate_conn(conn->sock);
+}
+
 /*
  * executormgr_is_dispatchable
  *	Return the true iff executor can receive query.
@@ -379,7 +387,7 @@ executormgr_is_dispatchable(QueryExecutor *executor)
 			return false;
 	}
 
-	if (PQstatus(conn) == CONNECTION_BAD)
+	if (!executormgr_validate_conn(conn) || PQstatus(conn) == CONNECTION_BAD)
 	{
 		write_log("function executormgr_is_dispatchable meets error, connection is bad.");
 		executormgr_catch_error(executor);
@@ -389,9 +397,6 @@ executormgr_is_dispatchable(QueryExecutor *executor)
 	return true;
 }
 
-
-
-
 /*
  * executormgr_dispatch_and_run
  *	Dispatch data and run the query.
@@ -670,21 +675,30 @@ executormgr_free_takeovered_segment_conn(SegmentDatabaseDescriptor *desc)
 }
 
 static SegmentDatabaseDescriptor *
-executormgr_allocate_any_executor(bool is_writer)
+executormgr_allocate_any_executor(bool is_writer, bool is_entrydb)
 {
-	return poolmgr_get_random_item(executor_cache.pool);
-}	
-
-static SegmentDatabaseDescriptor *
-executormgr_allocate_executor_on_entrydb(bool is_writer)
-{
-  return poolmgr_get_random_item(executor_cache.entrydb_pool);
+  // get executor from pool and check whether the connection is valid, keep
+  // running until finding a valid one or the pool becomes NULL
+  struct PoolMgrState *executor_pool =
+      is_entrydb ? executor_cache.entrydb_pool : executor_cache.pool;
+  SegmentDatabaseDescriptor *desc = poolmgr_get_random_item(executor_pool);
+  while (desc != NULL && !executormgr_validate_conn(desc->conn)) {
+    desc = poolmgr_get_random_item(executor_pool);
+  }
+  return desc;
 }
 
 static SegmentDatabaseDescriptor *
 executormgr_allocate_executor_by_name(const char *name, bool is_writer)
 {
-	return poolmgr_get_item_by_name(executor_cache.pool, name);
+  // get executor from pool and check whether the connection is valid, keep
+  // running until finding a valid one or the pool becomes NULL
+  SegmentDatabaseDescriptor *desc =
+      poolmgr_get_item_by_name(executor_cache.pool, name);
+  while (desc != NULL && !executormgr_validate_conn(desc->conn)) {
+    desc = poolmgr_get_item_by_name(executor_cache.pool, name);
+  }
+  return desc;
 }	
 
 /*
@@ -697,9 +711,9 @@ executormgr_allocate_executor(Segment *segment, bool is_writer, bool is_entrydb)
 	SegmentDatabaseDescriptor *ret;
 
 	if (is_entrydb || (segment != NULL && segment->master))
-	  ret = executormgr_allocate_executor_on_entrydb(is_writer);
+	  ret = executormgr_allocate_any_executor(is_writer, true);
 	else if (segment == NULL)
-		ret = executormgr_allocate_any_executor(is_writer);
+		ret = executormgr_allocate_any_executor(is_writer, false);
 	else
 		ret = executormgr_allocate_executor_by_name(GetSegmentHashKey(segment), is_writer);
 	if (!ret)

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/ca7afdce/src/backend/cdb/motion/ic_udp.c
----------------------------------------------------------------------
diff --git a/src/backend/cdb/motion/ic_udp.c b/src/backend/cdb/motion/ic_udp.c
index 56201bb..e5547f8 100644
--- a/src/backend/cdb/motion/ic_udp.c
+++ b/src/backend/cdb/motion/ic_udp.c
@@ -5876,54 +5876,13 @@ formatSockAddr(struct sockaddr *sa, char* buf, int bufsize)
 }								/* formatSockAddr */
 
 /*
- * dispatcherAYT
- * 		Check the connection from the dispatcher to verify that it is still there.
- *
- * The connection is a struct Port, stored in the global MyProcPort.
- *
- * Return true if the dispatcher connection is still alive.
- */
-static bool
-dispatcherAYT(void)
-{
-	ssize_t		ret;
-	char		buf;
-
-	if (MyProcPort->sock < 0)
-		return false;
-
-#ifndef WIN32
-		ret = recv(MyProcPort->sock, &buf, 1, MSG_PEEK|MSG_DONTWAIT);
-#else
-		ret = recv(MyProcPort->sock, &buf, 1, MSG_PEEK|MSG_PARTIAL);
-#endif
-
-	if (ret == 0) /* socket has been closed. EOF */
-		return false;
-
-	if (ret > 0) /* data waiting on socket, it must be OK. */
-		return true;
-
-	if (ret == -1) /* error, or would be block. */
-	{
-		if (errno == EAGAIN || errno == EINPROGRESS)
-			return true; /* connection intact, no data available */
-		else
-			return false;
-	}
-	/* not reached */
-
-	return true;
-}
-
-/*
  * checkQDConnectionAlive
  * 		Check whether QD connection is still alive. If not, report error.
  */
 static void
 checkQDConnectionAlive(void)
 {
-	if (!dispatcherAYT())
+	if (!dispatch_validate_conn(MyProcPort->sock))
 	{
 		if (Gp_role == GP_ROLE_EXECUTE)
 			ereport(ERROR, (errcode(ERRCODE_GP_INTERCONNECTION_ERROR),

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/ca7afdce/src/include/cdb/dispatcher.h
----------------------------------------------------------------------
diff --git a/src/include/cdb/dispatcher.h b/src/include/cdb/dispatcher.h
index 3175705..7b6db91 100644
--- a/src/include/cdb/dispatcher.h
+++ b/src/include/cdb/dispatcher.h
@@ -84,6 +84,8 @@ extern void dispatcher_print_statistics(StringInfo buf, struct DispatchData
*dat
 extern ProcessIdentity *dispatch_get_task_identity(struct DispatchTask *task);
 extern struct DispatchCommandQueryParms *dispatcher_get_QueryParms(struct DispatchData *data);
 
+extern bool dispatch_validate_conn(pgsocket sock);
+
 void dispatch_end_env(struct DispatchData *data);
 void free_dispatch_data(struct DispatchData *data);
 


Mime
View raw message