hawq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From lil...@apache.org
Subject incubator-hawq git commit: HAWQ-275. Add connection check when dispatcher wants to reuse the QE forked by former queries, and when dispather dispatches the detailed query command to QE
Date Fri, 19 Feb 2016 01:11:21 GMT
Repository: incubator-hawq
Updated Branches:
  refs/heads/HAWQ-275 [created] f8dfd675e


HAWQ-275. Add connection check when dispatcher wants to reuse the QE forked by former queries,
and when dispather dispatches the detailed query command to QE


Project: http://git-wip-us.apache.org/repos/asf/incubator-hawq/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-hawq/commit/f8dfd675
Tree: http://git-wip-us.apache.org/repos/asf/incubator-hawq/tree/f8dfd675
Diff: http://git-wip-us.apache.org/repos/asf/incubator-hawq/diff/f8dfd675

Branch: refs/heads/HAWQ-275
Commit: f8dfd675e8aa11ac5a03a38f1d455f4090115794
Parents: 629ecd7
Author: Lili Ma <lma@pivotal.io>
Authored: Fri Feb 19 09:10:58 2016 +0800
Committer: Lili Ma <lma@pivotal.io>
Committed: Fri Feb 19 09:10:58 2016 +0800

----------------------------------------------------------------------
 src/backend/cdb/executormgr.c | 69 +++++++++++++++++++++++++++++---------
 1 file changed, 54 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/f8dfd675/src/backend/cdb/executormgr.c
----------------------------------------------------------------------
diff --git a/src/backend/cdb/executormgr.c b/src/backend/cdb/executormgr.c
index 6216d23..107e860 100644
--- a/src/backend/cdb/executormgr.c
+++ b/src/backend/cdb/executormgr.c
@@ -361,6 +361,39 @@ executormgr_cancel(QueryExecutor *executor)
 	return success;
 }
 
+static bool
+executormgr_validate_conn(PGconn *conn)
+{
+  ssize_t   ret;
+  char    buf;
+
+  if (conn->sock < 0)
+    return false;
+
+#ifndef WIN32
+    ret = recv(conn->sock, &buf, 1, MSG_PEEK|MSG_DONTWAIT);
+#else
+    ret = recv(conn->sock, &buf, 1, MSG_PEEK|MSG_PARTIAL);
+#endif
+
+  if (ret == 0) /* socket has been closed. EOF */
+    return false;
+
+  if (ret > 0) /* data waiting on socket, it must be OK. */
+    return true;
+
+  if (ret == -1) /* error, or would be block. */
+  {
+    if (errno == EAGAIN || errno == EINPROGRESS)
+      return true; /* connection intact, no data available */
+    else
+      return false;
+  }
+  /* not reached */
+
+  return true;
+}
+
 /*
  * executormgr_is_dispatchable
  *	Return the true iff executor can receive query.
@@ -379,7 +412,7 @@ executormgr_is_dispatchable(QueryExecutor *executor)
 			return false;
 	}
 
-	if (PQstatus(conn) == CONNECTION_BAD)
+	if (!executormgr_validate_conn(conn)|| PQstatus(conn) == CONNECTION_BAD)
 	{
 		write_log("function executormgr_is_dispatchable meets error, connection is bad.");
 		executormgr_catch_error(executor);
@@ -389,9 +422,6 @@ executormgr_is_dispatchable(QueryExecutor *executor)
 	return true;
 }
 
-
-
-
 /*
  * executormgr_dispatch_and_run
  *	Dispatch data and run the query.
@@ -670,21 +700,30 @@ executormgr_free_takeovered_segment_conn(SegmentDatabaseDescriptor *desc)
 }
 
 static SegmentDatabaseDescriptor *
-executormgr_allocate_any_executor(bool is_writer)
+executormgr_allocate_any_executor(bool is_writer, bool is_entrydb)
 {
-	return poolmgr_get_random_item(executor_cache.pool);
-}	
-
-static SegmentDatabaseDescriptor *
-executormgr_allocate_executor_on_entrydb(bool is_writer)
-{
-  return poolmgr_get_random_item(executor_cache.entrydb_pool);
+  // get executor from pool and check whether the connection is valid, keep
+  // running until finding a valid one or the pool becomes NULL
+  struct PoolMgrState *executor_pool =
+      is_entrydb ? executor_cache.entrydb_pool : executor_cache.pool;
+  SegmentDatabaseDescriptor *desc = poolmgr_get_random_item(executor_pool);
+  while (desc != NULL && !executormgr_validate_conn(desc->conn)) {
+    desc = poolmgr_get_random_item(executor_pool);
+  }
+  return desc;
 }
 
 static SegmentDatabaseDescriptor *
 executormgr_allocate_executor_by_name(const char *name, bool is_writer)
 {
-	return poolmgr_get_item_by_name(executor_cache.pool, name);
+  // get executor from pool and check whether the connection is valid, keep
+  // running until finding a valid one or the pool becomes NULL
+  SegmentDatabaseDescriptor *desc =
+      poolmgr_get_item_by_name(executor_cache.pool, name);
+  while (desc != NULL && !executormgr_validate_conn(desc->conn)) {
+    desc = poolmgr_get_item_by_name(executor_cache.pool, name);
+  }
+  return desc;
 }	
 
 /*
@@ -697,9 +736,9 @@ executormgr_allocate_executor(Segment *segment, bool is_writer, bool is_entrydb)
 	SegmentDatabaseDescriptor *ret;
 
 	if (is_entrydb || (segment != NULL && segment->master))
-	  ret = executormgr_allocate_executor_on_entrydb(is_writer);
+	  ret = executormgr_allocate_any_executor(is_writer, true);
 	else if (segment == NULL)
-		ret = executormgr_allocate_any_executor(is_writer);
+		ret = executormgr_allocate_any_executor(is_writer, false);
 	else
 		ret = executormgr_allocate_executor_by_name(GetSegmentHashKey(segment), is_writer);
 	if (!ret)


Mime
View raw message