From commits-return-7793-archive-asf-public=cust-asf.ponee.io@trafodion.apache.org Sat Jun 16 19:09:45 2018 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx-eu-01.ponee.io (Postfix) with SMTP id ECB4B1807F2 for ; Sat, 16 Jun 2018 19:09:33 +0200 (CEST) Received: (qmail 4669 invoked by uid 500); 16 Jun 2018 17:09:32 -0000 Mailing-List: contact commits-help@trafodion.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: commits@trafodion.apache.org Delivered-To: mailing list commits@trafodion.apache.org Received: (qmail 3950 invoked by uid 99); 16 Jun 2018 17:09:32 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Sat, 16 Jun 2018 17:09:32 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id CA442E11C8; Sat, 16 Jun 2018 17:09:31 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: sureshs@apache.org To: commits@trafodion.apache.org Date: Sat, 16 Jun 2018 17:10:04 -0000 Message-Id: <85b55698a686407494cea5360c408f66@git.apache.org> In-Reply-To: <08c34b2c44164fae8ff4de2af25fadbf@git.apache.org> References: <08c34b2c44164fae8ff4de2af25fadbf@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [34/50] [abbrv] trafodion git commit: Scale testing fixes. Scale testing fixes. Project: http://git-wip-us.apache.org/repos/asf/trafodion/repo Commit: http://git-wip-us.apache.org/repos/asf/trafodion/commit/3931a75f Tree: http://git-wip-us.apache.org/repos/asf/trafodion/tree/3931a75f Diff: http://git-wip-us.apache.org/repos/asf/trafodion/diff/3931a75f Branch: refs/heads/master Commit: 3931a75fad052df35072a7109aa5e4babbb90f38 Parents: db65660 Author: Zalo Correa Authored: Mon May 14 06:24:43 2018 -0700 Committer: Zalo Correa Committed: Mon May 14 06:24:43 2018 -0700 ---------------------------------------------------------------------- .../export/include/common/evl_sqlog_eventnum.h | 34 +- core/sqf/export/include/trafconf/trafconfig.h | 8 +- core/sqf/monitor/linux/cluster.cxx | 431 +++++++++++++++---- core/sqf/monitor/linux/cluster.h | 12 +- core/sqf/monitor/linux/commaccept.cxx | 4 +- core/sqf/monitor/linux/healthcheck.cxx | 25 +- core/sqf/monitor/linux/healthcheck.h | 1 + core/sqf/monitor/linux/internal.h | 10 +- core/sqf/monitor/linux/monitor.cxx | 143 ++++-- core/sqf/monitor/linux/msgdef.h | 1 + core/sqf/monitor/linux/nameserver.cxx | 9 +- core/sqf/monitor/linux/nsprocess.cxx | 6 +- core/sqf/monitor/linux/nsreqnewproc.cxx | 1 + core/sqf/monitor/linux/nsreqqueue.cxx | 2 +- core/sqf/monitor/linux/pnode.cxx | 35 +- core/sqf/monitor/linux/process.cxx | 100 +++-- core/sqf/monitor/linux/process.h | 3 +- core/sqf/monitor/linux/ptpclient.cxx | 187 +++++++- core/sqf/monitor/linux/ptpclient.h | 10 + core/sqf/monitor/linux/ptpcommaccept.cxx | 41 +- core/sqf/monitor/linux/redirector.cxx | 88 +++- core/sqf/monitor/linux/replicate.cxx | 9 + core/sqf/monitor/linux/replicate.h | 2 + core/sqf/monitor/linux/reqdump.cxx | 9 +- core/sqf/monitor/linux/reqnewproc.cxx | 3 +- core/sqf/monitor/linux/reqprocinfo.cxx | 151 ++++--- core/sqf/monitor/linux/reqqueue.cxx | 282 +++++++++++- core/sqf/monitor/linux/reqqueue.h | 48 +++ core/sqf/monitor/linux/shell.cxx | 15 +- core/sqf/monitor/linux/system.cxx | 2 +- core/sqf/monitor/linux/tmsync.cxx | 12 + core/sqf/monitor/linux/zclient.cxx | 120 +++++- core/sqf/monitor/linux/zclient.h | 2 + core/sqf/monitor/linux/zootest.cxx | 8 + core/sqf/monitor/test/monitor.env | 40 +- core/sqf/monitor/test/sqconfig.monitor.virtual | 6 +- core/sqf/sqenvcom.sh | 30 +- core/sqf/sql/scripts/monitor.env | 2 +- core/sqf/sql/scripts/sqconfig | 6 +- core/sqf/sql/scripts/sqnameserver.pm | 4 +- core/sqf/sql/scripts/sqnodes.pm | 4 +- 41 files changed, 1526 insertions(+), 380 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/trafodion/blob/3931a75f/core/sqf/export/include/common/evl_sqlog_eventnum.h ---------------------------------------------------------------------- diff --git a/core/sqf/export/include/common/evl_sqlog_eventnum.h b/core/sqf/export/include/common/evl_sqlog_eventnum.h index e9186e1..2d630c9 100644 --- a/core/sqf/export/include/common/evl_sqlog_eventnum.h +++ b/core/sqf/export/include/common/evl_sqlog_eventnum.h @@ -85,7 +85,8 @@ #define MON_CLUSTER_MARKDOWN_2 101011102 #define MON_CLUSTER_MARKDOWN_3 101011103 //#define MON_CLUSTER_MARKUP 101011201 -#define MON_CLUSTER_NODE_TM_READY 101011300 +#define MON_CLUSTER_NODE_TM_READY_1 101011301 +#define MON_CLUSTER_NODE_TM_READY_2 101011302 #define MON_CLUSTER_FORCEDOWN_1 101011401 #define MON_CLUSTER_FORCEDOWN_2 101011402 #define MON_CLUSTER_CLUSTER_MANAGER 101011501 @@ -245,6 +246,13 @@ #define MON_PINGSOCKPEER_1 101015201 #define MON_PINGSOCKPEER_2 101015202 +#define MON_PINGSOCKPEER_3 101015203 +#define MON_PINGSOCKPEER_4 101015204 + +#define MON_CLUSTER_ASSIGNMONITORLEADER_1 101015301 +#define MON_CLUSTER_ASSIGNMONITORLEADER_2 101015302 +#define MON_CLUSTER_ASSIGNMONITORLEADER_3 101015303 +#define MON_CLUSTER_ASSIGNMONITORLEADER_4 101015304 /* Module: monitor.cxx = 02 */ @@ -265,6 +273,7 @@ #define MON_MONITOR_MAIN_15 101020115 #define MON_MONITOR_MAIN_16 101020116 #define MON_MONITOR_MAIN_17 101020117 +#define MON_MONITOR_MAIN_18 101020118 #define MON_MONITOR_TMLEADER_1 101020201 #define MON_MONITOR_TMLEADER_2 101020202 #define MON_MONITOR_DEATH_HANDLER_1 101020301 @@ -724,16 +733,19 @@ #define MON_REQ_NAMESERVER_DELETE_3 101181203 #define MON_REQ_NAMESERVER_DELETE_4 101181204 #define MON_REQ_NAMESERVER_INFO_1 101181301 - #define MON_REQ_EVALREQ_PERFORMANCE_1 101181501 #define MON_INTREQ_CLONEPROC_1 101181601 #define MON_INTREQ_EXIT_1 101181701 #define MON_INTREQ_NEWPROC_1 101181801 +#define MON_INTREQ_NEWPROC_2 101181802 #define MON_INTREQ_NOTIFY_1 101181901 #define MON_INTREQ_NOTIFY_2 101181902 #define MON_INTREQ_NOTIFY_3 101181903 #define MON_INTREQ_NOTIFY_4 101181904 #define MON_INTREQ_OPEN_1 101182001 +#define MON_REQ_IODATA_1 101182201 +#define MON_REQ_STDIN_1 101182301 +#define MON_REQ_STDIN_2 101182302 /* Module: clio.cxx = 19 */ #define MON_CLIO_ACQUIRE_MSG_1 101190101 @@ -785,7 +797,8 @@ #define MON_HEALTHCHECK_WAKEUP_1 101230401 #define MON_HEALTHCHECK_WAKEUP_2 101230402 #define MON_HEALTHCHECK_Q_BLOCK 101230501 -#define MON_HEALTHCHECK_STOP_NS_1 101230601 + +#define MON_HEALTHCHECK_STOP_NS_1 101230701 /* Module: sdtimer.cxx = 24 */ #define MON_SDTIMER_SOFTDOG_TH_1 101240101 @@ -945,6 +958,7 @@ #define MON_ZCLIENT_WATCHMASTERNODEDELETE_2 101372106 #define MON_ZCLIENT_WATCHMASTERNODEDELETE_3 101372107 #define MON_ZCLIENT_CREATEORSETMASTERWATCH 101372108 +#define MON_ZCLIENT_CREATEORSETMASTERINFO 101372109 /* Module: zconfig.cxx = 38 */ #define ZCONFIG_ZCONFIG_1 101380101 @@ -1018,6 +1032,20 @@ /* Module ptpclient.cxx = 93 */ #define PTPCLIENT_PTPCLIENT_1 101930101 +#define PTPCLIENT_STDINREQ_1 101930201 +#define PTPCLIENT_STDINREQ_2 101930202 +#define PTPCLIENT_STDIODATA_1 101930301 +#define PTPCLIENT_STDIODATA_2 101930302 + +/* Module ptpcommaccept.cxx = 94 */ +#define PTP_COMMACCEPT_1 101940101 +#define PTP_COMMACCEPT_2 101940102 +#define PTP_COMMACCEPT_3 101940103 +#define PTP_COMMACCEPT_4 101940104 +#define PTP_COMMACCEPT_5 101940105 +#define PTP_COMMACCEPT_6 101940106 +#define PTP_COMMACCEPT_7 101940107 +#define PTP_COMMACCEPT_8 101940108 /**********************************************/ http://git-wip-us.apache.org/repos/asf/trafodion/blob/3931a75f/core/sqf/export/include/trafconf/trafconfig.h ---------------------------------------------------------------------- diff --git a/core/sqf/export/include/trafconf/trafconfig.h b/core/sqf/export/include/trafconf/trafconfig.h index d68a8d0..2eba1c8 100644 --- a/core/sqf/export/include/trafconf/trafconfig.h +++ b/core/sqf/export/include/trafconf/trafconfig.h @@ -50,7 +50,11 @@ #define TC_PERSIST_KEY_MAX 64 #define TC_PERSIST_VALUE_MAX 4096 #define TC_PERSIST_KEYS_VALUE_MAX 4096 -#define TC_NODES_MAX 1024 +#define TC_NODES_MAX 1024 // This can be higher when needed and + // will have performance implications + // in the monitor process. + // NOTE: Must increment by 64, see + // monitor's msgdef.h MAX_NODES #define TC_SPARE_NODES_MAX 256 #define TC_UNIQUE_STRING_VALUE_MAX 4096 @@ -111,7 +115,7 @@ typedef enum { TCDBSTOREUNDEFINED = 0 , TCDBMYSQL = 1 // MySQL Database , TCDBPOSTGRESQL = 2 // PostgresQL Database [TBD] - , TCDBSQLITE = 3 // Sqlite Database [deprecated] + , TCDBSQLITE = 3 // Sqlite Database } TcStorageType_t; typedef enum { http://git-wip-us.apache.org/repos/asf/trafodion/blob/3931a75f/core/sqf/monitor/linux/cluster.cxx ---------------------------------------------------------------------- diff --git a/core/sqf/monitor/linux/cluster.cxx b/core/sqf/monitor/linux/cluster.cxx index 4646433..a7ccdf1 100644 --- a/core/sqf/monitor/linux/cluster.cxx +++ b/core/sqf/monitor/linux/cluster.cxx @@ -1,4 +1,4 @@ -// /////////////////////////////////////////////////////////////////////////////// +/////////////////////////////////////////////////////////////////////////////// // // @@@ START COPYRIGHT @@@ // @@ -99,7 +99,6 @@ extern CommType_t CommType; extern int MyPNID; extern CReqQueue ReqQueue; -extern char Node_name[MPI_MAX_PROCESSOR_NAME]; extern CMonitor *Monitor; extern CNodeContainer *Nodes; @@ -354,13 +353,13 @@ void CCluster::NodeTmReady( int nid ) char la_buf[MON_STRING_BUF_SIZE]; sprintf( la_buf, "[%s], Soft Node up! pnid=%d, name=(%s)\n" , method_name, MyNode->GetPNid(), MyNode->GetName()); - mon_log_write(MON_CLUSTER_NODE_TM_READY, SQ_LOG_INFO, la_buf); + mon_log_write(MON_CLUSTER_NODE_TM_READY_1, SQ_LOG_INFO, la_buf); } else { char la_buf[MON_STRING_BUF_SIZE]; sprintf(la_buf, "[%s], Node activated! pnid=%d, name=(%s) \n", method_name, MyNode->GetPNid(), MyNode->GetName()); - mon_log_write(MON_CLUSTER_NODE_TM_READY, SQ_LOG_INFO, la_buf); + mon_log_write(MON_CLUSTER_NODE_TM_READY_2, SQ_LOG_INFO, la_buf); // Let other monitors know the node is up CReplActivateSpare *repl = new CReplActivateSpare( MyPNID, -1 ); @@ -398,9 +397,30 @@ void CCluster::NodeReady( CNode *spareNode ) TRACE_EXIT; } +void CCluster::UpdateMonitorPort (const char* newMaster) +{ + const char method_name[] = "CCluster::UpdateMonitorPort"; + TRACE_ENTRY; + + char *monitorPort = getenv ("MONITOR_COMM_PORT"); + if ((monitorPort) && (newMaster)) + { + strcpy( IntegratingMonitorPort, newMaster ); + strcat( IntegratingMonitorPort, ":"); + strcat( IntegratingMonitorPort, monitorPort); + + if (trace_settings & (TRACE_INIT | TRACE_RECOVERY | TRACE_REQUEST | TRACE_SYNC | TRACE_TMSYNC)) + { + trace_printf("%s@%d" " (MasterMonitor) UpdateMonitorPort Updating IntegratingMonitorPort to %s\n", + method_name, __LINE__,IntegratingMonitorPort ); + } + } + TRACE_EXIT; +} + // Assign leaders as required // Current leaders are TM Leader and Monitor Leader -void CCluster::AssignLeaders( int pnid, bool checkProcess ) +void CCluster::AssignLeaders( int pnid, const char* failedMaster, bool checkProcess ) { const char method_name[] = "CCluster::AssignLeaders"; TRACE_ENTRY; @@ -408,31 +428,31 @@ void CCluster::AssignLeaders( int pnid, bool checkProcess ) #ifndef NAMESERVER_PROCESS AssignTmLeader ( pnid, checkProcess ); #else + pnid = pnid; checkProcess = checkProcess; #endif - AssignMonitorLeader ( pnid ); + AssignMonitorLeader ( failedMaster ); TRACE_EXIT; } -// Assign montior lead in the case of failure -void CCluster::AssignMonitorLeader( int pnid ) +// Assign monitor lead in the case of failure +void CCluster::AssignMonitorLeader( const char* failedMaster ) { const char method_name[] = "CCluster::AssignMonitorLeader"; TRACE_ENTRY; int i = 0; int rc = 0; - - int monitorLeaderPNid = monitorLeaderPNid_; + + int monitorLeaderPNid = -1; CNode *node = NULL; - - if (monitorLeaderPNid_ != pnid) + + if (failedMaster == NULL) { if (trace_settings & (TRACE_INIT | TRACE_RECOVERY | TRACE_REQUEST | TRACE_SYNC | TRACE_TMSYNC)) { - trace_printf( "%s@%d" " - (MasterMonitor) returning, pnid %d != monitorLead %d\n" - , method_name, __LINE__, pnid, monitorLeaderPNid_ ); + trace_printf( "%s@%d" " - (MasterMonitor) failedMaster is NULL, returning\n" , method_name, __LINE__); } TRACE_EXIT; return; @@ -440,22 +460,93 @@ void CCluster::AssignMonitorLeader( int pnid ) if (trace_settings & (TRACE_INIT | TRACE_RECOVERY | TRACE_REQUEST | TRACE_SYNC | TRACE_TMSYNC)) { - trace_printf( "%s@%d" " - (MasterMonitor) Node " "%d" " MonitorLeader failed!\n" - , method_name, __LINE__, monitorLeaderPNid_ ); + trace_printf( "%s@%d" " - (MasterMonitor) " " MonitorLeader (%s) failed!\n" + , method_name, __LINE__, failedMaster ); } - for (i=0; iWaitForAndReturnMaster (false); + if (masterMonitor) + { + // IFF it is the failed master, delete, do not delete anything else because we could delete a new master + if (strcmp (masterMonitor, failedMaster) == 0) { - monitorLeaderPNid = 0; // restart with nid 0 + ZClient->WatchNodeMasterDelete (failedMaster); + if (trace_settings & (TRACE_INIT | TRACE_RECOVERY | TRACE_REQUEST | TRACE_SYNC | TRACE_TMSYNC)) + { + trace_printf( "%s@%d" " - (MasterMonitor) deleting master %s\n" + , method_name, __LINE__, masterMonitor ); + } } + // no worries + else + { + rc = ZClient->WatchMasterNode( masterMonitor ); + UpdateMonitorPort ( masterMonitor ); + if (trace_settings & (TRACE_INIT | TRACE_RECOVERY | TRACE_REQUEST | TRACE_SYNC | TRACE_TMSYNC)) + { + trace_printf( "%s@%d" " - (MasterMonitor) master did not match, set watch (rc = %d) and returning %s\n" + , method_name, __LINE__, rc, masterMonitor ); + } + TRACE_EXIT; + return; + } + } + + // choose a new master + if (((MyNode) && ((MyNode->GetState() != State_Up) ||(!IAmIntegrated))) || (MyNode == NULL /* not set up yet*/)) + { + // Do not let this monitor participate in choosing the master. It can wait until an integrated + // monitor makes a decision. + if (trace_settings & (TRACE_INIT | TRACE_RECOVERY | TRACE_REQUEST | TRACE_SYNC | TRACE_TMSYNC)) + { + trace_printf( "%s@%d" " - (MasterMonitor) This Node is not set up yet and will not participate in master choice!\n" + , method_name, __LINE__); + } + + // wait until another monitor choose a master + const char *masterMonitor = ZClient->WaitForAndReturnMaster (true); + if (masterMonitor) + { + rc = ZClient->WatchMasterNode( masterMonitor ); + if (trace_settings & (TRACE_INIT | TRACE_RECOVERY | TRACE_REQUEST | TRACE_SYNC | TRACE_TMSYNC)) + { + trace_printf("%s@%d" " (MasterMonitor) AssignMonitorLeader WatchMasterNode with rc = %d\n", method_name, __LINE__, rc); + } + + UpdateMonitorPort ( masterMonitor ); + } + TRACE_EXIT; + return; + } + + // For all monitors who are up - choose the master using the same logic + for (i=0; iGetPNid(); - if (trace_settings & (TRACE_INIT | TRACE_RECOVERY | TRACE_REQUEST | TRACE_SYNC | TRACE_TMSYNC)) { - trace_printf("%s@%d" " - Node " "%d" " is the new monitorLeaderPNid_." "\n", method_name, __LINE__, monitorLeaderPNid_); + trace_printf("%s@%d" " - Node " "%d" " is the new monitorLeaderPNid." "\n", method_name, __LINE__, node->GetPNid()); } - if (ZClientEnabled) + const char *masterMonitor = ZClient->WaitForAndReturnMaster (false); + + //nobody has written it yet, we don't want to overwrite anything + if (!masterMonitor) { rc = ZClient->CreateMasterZNode ( node->GetName() ); if (trace_settings & (TRACE_INIT | TRACE_RECOVERY | TRACE_REQUEST | TRACE_SYNC | TRACE_TMSYNC)) { trace_printf("%s@%d" " (MasterMonitor) AssignMonitorLeader CreateMasterZNode with rc = %d\n", method_name, __LINE__, rc); } + char buf[MON_STRING_BUF_SIZE]; + snprintf( buf, sizeof(buf) + , "[%s], Master Monitor is %s on node %d\n" + , method_name, node->GetName(), node->GetPNid() ); + mon_log_write(MON_CLUSTER_ASSIGNMONITORLEADER_2, SQ_LOG_INFO, buf); + if ( (rc == ZOK) || (rc == ZNODEEXISTS) ) { - if ( IsAgentMode ) - { - rc = ZClient->WatchMasterNode( node->GetName( ) ); - if ( trace_settings & (TRACE_INIT | TRACE_RECOVERY | TRACE_REQUEST | TRACE_SYNC | TRACE_TMSYNC) ) - { - trace_printf( "%s@%d" " (MasterMonitor) AssignMonitorLeader WatchMasterNode with rc = %d\n", method_name, __LINE__, rc ); - } - } + rc = ZClient->WatchMasterNode( node->GetName() ); + if (trace_settings & (TRACE_INIT | TRACE_RECOVERY | TRACE_REQUEST | TRACE_SYNC | TRACE_TMSYNC)) + { + trace_printf("%s@%d" " (MasterMonitor) AssignMonitorLeader WatchMasterNode with rc = %d\n", method_name, __LINE__, rc); + } } else { if (trace_settings & (TRACE_INIT | TRACE_RECOVERY | TRACE_REQUEST | TRACE_SYNC | TRACE_TMSYNC)) { - trace_printf("%s@%d" " (MasterMonitor) AssignMonitorLeader Unable to set create or set watch\n", method_name, __LINE__); + trace_printf("%s@%d" " (MasterMonitor) AssignMonitorLeader Unable to create or set watch\n", method_name, __LINE__); } char buf[MON_STRING_BUF_SIZE]; snprintf( buf, sizeof(buf) - , "[%s], Unable to set create or set watch on master node %s\n" + , "[%s], Unable to create or set watch on master node %s\n" , method_name, node->GetName() ); - mon_log_write(MON_ZCLIENT_CREATEORSETMASTERWATCH, SQ_LOG_ERR, buf); + mon_log_write(MON_CLUSTER_ASSIGNMONITORLEADER_3, SQ_LOG_ERR, buf); } + } + else + { + rc = ZClient->WatchMasterNode( masterMonitor ); + char buf[MON_STRING_BUF_SIZE]; + snprintf( buf, sizeof(buf) + , "[%s], Master Monitor is %s\n" + , method_name, masterMonitor); + mon_log_write(MON_CLUSTER_ASSIGNMONITORLEADER_4, SQ_LOG_INFO, buf); + if (trace_settings & (TRACE_INIT | TRACE_RECOVERY | TRACE_REQUEST | TRACE_SYNC | TRACE_TMSYNC)) + { + trace_printf("%s@%d" " (MasterMonitor) AssignMonitorLeader WatchMasterNode with rc = %d\n", method_name, __LINE__, rc); + } } break; @@ -579,7 +693,7 @@ void CCluster::AssignTmLeader( int pnid, bool checkProcess ) { if (trace_settings & (TRACE_INIT | TRACE_RECOVERY | TRACE_REQUEST | TRACE_SYNC | TRACE_TMSYNC)) { - trace_printf( "%s@%d - Getting process from Name Server, nid=%d, type%s\n" + trace_printf( "%s@%d - Getting process from Name Server, nid=%d, type=%s\n" , method_name, __LINE__ , tmLeaderNid_, ProcessTypeString(ProcessType_DTM) ); } @@ -711,7 +825,6 @@ CCluster::CCluster (void) configPNodesCount_ (-1), configPNodesMax_ (-1), nodeMap_ (NULL), - monitorLeaderPNid_ (-1), #ifndef NAMESERVER_PROCESS tmLeaderNid_ (-1), tmReadyCount_(0), @@ -1132,7 +1245,7 @@ void CCluster::HardNodeDown (int pnid, bool communicate_state) if ( Emulate_Down ) { IAmIntegrated = false; - AssignLeaders(pnid, false); + AssignLeaders(pnid, node->GetName(), false); } TRACE_EXIT; @@ -1237,7 +1350,7 @@ void CCluster::SoftNodeDown( int pnid ) } IAmIntegrated = false; - AssignLeaders(pnid, false); + AssignLeaders(pnid, node->GetName(), false); TRACE_EXIT; } @@ -2044,6 +2157,7 @@ void CCluster::DoDeviceReq(char * ldevName) } #endif +#ifdef EXCHANGE_CPU_SCHEDULING_DATA void CCluster::SaveSchedData( struct internal_msg_def *recv_msg ) { const char method_name[] = "CCluster::SaveSchedData"; @@ -2081,6 +2195,7 @@ void CCluster::SaveSchedData( struct internal_msg_def *recv_msg ) TRACE_EXIT; } +#endif void CCluster::HandleOtherNodeMsg (struct internal_msg_def *recv_msg, int pnid) @@ -2550,7 +2665,9 @@ void CCluster::HandleOtherNodeMsg (struct internal_msg_def *recv_msg, #endif case InternalType_SchedData: +#ifdef EXCHANGE_CPU_SCHEDULING_DATA SaveSchedData( recv_msg ); +#endif break; case InternalType_Set: @@ -3205,7 +3322,7 @@ void CCluster::InitializeConfigCluster( void ) const char method_name[] = "CCluster::InitializeConfigCluster"; TRACE_ENTRY; - int worldSize; + int worldSize = 0; MPI_Comm_size (MPI_COMM_WORLD, &worldSize); #ifdef NAMESERVER_PROCESS if ( !IsRealCluster ) @@ -3218,21 +3335,25 @@ void CCluster::InitializeConfigCluster( void ) } } #endif - int rankToPnid[worldSize]; CClusterConfig *clusterConfig = Nodes->GetClusterConfig(); + configPNodesCount_ = clusterConfig->GetPNodesCount(); + int rankToPnid[configPNodesCount_]; #ifdef NAMESERVER_PROCESS currentNodes_ = 1; // non-master Name Servers join set through master Name Server #else - currentNodes_ = worldSize; -#endif - - if ( IsRealCluster ) + if (IAmIntegrating || IsAgentMode) { - configPNodesCount_ = clusterConfig->GetPNodesCount(); + currentNodes_ = 1; // non-master monitors join cluster through master monitor } else { + currentNodes_ = worldSize; + } +#endif + + if ( !IsRealCluster ) + { // Set virtual cluster size to collective size MPI_Comm_size (MPI_COMM_WORLD, &configPNodesCount_); #ifdef NAMESERVER_PROCESS @@ -3248,6 +3369,14 @@ void CCluster::InitializeConfigCluster( void ) upNodes_.upNodes[i/MAX_NODE_BITMASK] |= (1ull << (i%MAX_NODE_BITMASK)); } } + else + { + for (int i=0; iGetFirstNid(); int TmLeaderPNid = LNode[tmLeaderNid_]->GetNode()->GetPNid(); -#endif // Any nodes not in the initial MPI_COMM_WORLD are down. for (int i=0; iGetNode(indexToPnid_[i]); if ( node ) node->SetState( State_Down ); - -#ifndef NAMESERVER_PROCESS // assign new TmLeader if TMLeader node is dead. if (TmLeaderPNid == indexToPnid_[i]) { AssignTmLeader(indexToPnid_[i], false); } -#endif } else { // Set bit indicating node is up @@ -3481,20 +3605,16 @@ void CCluster::InitializeConfigCluster( void ) } } } -#ifndef NAMESERVER_PROCESS else { tmLeaderNid_ = 0; } -#endif // Initialize communicators for point-to-point communications int myRank; MPI_Comm_rank( MPI_COMM_WORLD, &myRank ); -#ifdef NAMESERVER_PROCESS if ( !IsRealCluster ) myRank = MyPNID; -#endif InitClusterComm(worldSize, myRank, rankToPnid); if ( CommType == CommType_Sockets ) @@ -3844,6 +3964,43 @@ bool CCluster::PingSockPeer(CNode *node) const char method_name[] = "CCluster::PingSockPeer"; TRACE_ENTRY; + static int sv_connect_wait_timeout = -2; + static int sv_connect_retry_count = 1; + if ( sv_connect_wait_timeout == -2 ) + { + // Use the EPOLL timeout and retry values + char *lv_connect_wait_timeout_env = getenv( "SQ_MON_EPOLL_WAIT_TIMEOUT" ); + if ( lv_connect_wait_timeout_env ) + { + // Timeout in seconds + sv_connect_wait_timeout = atoi( lv_connect_wait_timeout_env ); + char *lv_connect_retry_count_env = getenv( "SQ_MON_EPOLL_RETRY_COUNT" ); + if ( lv_connect_retry_count_env ) + { + sv_connect_retry_count = atoi( lv_connect_retry_count_env ); + } + if ( sv_connect_retry_count > 180 ) + { + sv_connect_retry_count = 180; + } + } + else + { + // default to 64 seconds + sv_connect_wait_timeout = 16; + sv_connect_retry_count = 4; + } + + char buf[MON_STRING_BUF_SIZE]; + snprintf( buf, sizeof(buf) + , "[%s@%d] Ping connect timeout wait_timeout=1 second, retry_count=%d\n" + , method_name + , __LINE__ + , (sv_connect_retry_count * sv_connect_wait_timeout) ); + + mon_log_write( MON_PINGSOCKPEER_3, SQ_LOG_INFO, buf ); + } + bool rs = true; int rc; int pingSock = -1; @@ -3855,10 +4012,12 @@ bool CCluster::PingSockPeer(CNode *node) , node->GetName(), node->GetPNid() ); } - // Attempt to connect with remote monitor - for (int i = 0; i < MAX_RECONN_PING_RETRY_COUNT; i++ ) + // Attempt to connect with remote monitor in one seconds increments + // to recover as quickly as possible or give up trying + for (int i = 0; i < (sv_connect_retry_count*sv_connect_wait_timeout); i++ ) { - pingSock = Monitor->Connect( node->GetCommPort() ); + // Disable internal retries + pingSock = Monitor->Connect( node->GetCommPort(), false ); if ( pingSock < 0 ) { if (node->GetState() != State_Up) @@ -3873,7 +4032,14 @@ bool CCluster::PingSockPeer(CNode *node) } break; } - sleep( MAX_RECONN_PING_WAIT_TIMEOUT ); + char buf[MON_STRING_BUF_SIZE]; + snprintf( buf, sizeof(buf) + , "[%s@%d] Retrying connect to remote monitor %s, pnid=%d, retry=%d\n" + , method_name + , __LINE__ + , node->GetName(), node->GetPNid(), i ); + mon_log_write( MON_PINGSOCKPEER_4, SQ_LOG_INFO, buf ); + sleep( 1 ); } else { @@ -4334,9 +4500,13 @@ void CCluster::ReIntegrateSock( int initProblem ) if (trace_settings & (TRACE_INIT | TRACE_RECOVERY)) { - trace_printf( "%s@%d - Connected to creator monitor, sending my info, " - "node %d (%s), commPort=%s, syncPort=%s, creator=%d, " - "creatorShellPid=%d:%d, ping=%d\n" + trace_printf( "%s@%d - Connected to creator monitor, sending my info:\n" + " myNodeInfo.pnid=%d (%s)\n" + " myNodeInfo.commPort=%s\n" + " myNodeInfo.syncPort=%s\n" + " myNodeInfo.creator=%d\n" + " myNodeInfo.creatorShellPid=%d:%d\n" + " myNodeInfo.ping=%d\n" , method_name, __LINE__ , myNodeInfo.pnid , myNodeInfo.nodeName @@ -4452,7 +4622,6 @@ void CCluster::ReIntegrateSock( int initProblem ) pch1 = strtok (commPort,":"); pch1 = strtok (NULL,":"); Node[nodeInfo[i].pnid]->SetCommSocketPort( atoi(pch1) ); - Node[nodeInfo[i].pnid]->SetSyncPort( syncPort ); pch2 = strtok (syncPort,":"); pch2 = strtok (NULL,":"); @@ -5193,7 +5362,7 @@ int CCluster::AllgatherSock( int nbytes, void *sbuf, char *rbuf, int tag, MPI_St int err = MPI_SUCCESS; peer_t p[GetConfigPNodesMax()]; memset( p, 0, sizeof(p) ); - tag = 0; // make compiler happy + tag = tag; // make compiler happy // Set to twice the ZClient session timeout static int sessionTimeout = ZClientEnabled ? (ZClient->GetSessionTimeout() * 2) : 120; @@ -5855,6 +6024,11 @@ int CCluster::AllgatherSockReconnect( MPI_Status *stats, bool reestablishConnect , idst , ErrorMsg(stats[idst].MPI_ERROR) ); } + + --currentNodes_; + // Clear bit in set of "up nodes" + upNodes_.upNodes[idst/MAX_NODE_BITMASK] &= ~(1ull << (idst%MAX_NODE_BITMASK)); + // Remove old socket from epoll set, it may not be there struct epoll_event event; event.data.fd = socks_[idst]; @@ -5864,6 +6038,12 @@ int CCluster::AllgatherSockReconnect( MPI_Status *stats, bool reestablishConnect } continue; } + if (trace_settings & (TRACE_INIT | TRACE_RECOVERY)) + { + trace_printf( "%s@%d - Pinging Node %s (%d) to see if it's up\n" + , method_name, __LINE__ + , node->GetName(), node->GetPNid() ); + } if (PingSockPeer(node)) { reconnectSock = ConnectSockPeer( node, idst, reestablishConnections ); @@ -5896,6 +6076,11 @@ int CCluster::AllgatherSockReconnect( MPI_Status *stats, bool reestablishConnect , node->GetName(), node->GetPNid() , idst, socks_[idst] ); } + + --currentNodes_; + // Clear bit in set of "up nodes" + upNodes_.upNodes[idst/MAX_NODE_BITMASK] &= ~(1ull << (idst%MAX_NODE_BITMASK)); + // Remove old socket from epoll set, it may not be there struct epoll_event event; event.data.fd = socks_[idst]; @@ -5949,6 +6134,11 @@ int CCluster::AllgatherSockReconnect( MPI_Status *stats, bool reestablishConnect , idst , ErrorMsg(stats[idst].MPI_ERROR) ); } + + --currentNodes_; + // Clear bit in set of "up nodes" + upNodes_.upNodes[idst/MAX_NODE_BITMASK] &= ~(1ull << (idst%MAX_NODE_BITMASK)); + // Remove old socket from epoll set, it may not be there struct epoll_event event; event.data.fd = socks_[idst]; @@ -5996,6 +6186,11 @@ int CCluster::AllgatherSockReconnect( MPI_Status *stats, bool reestablishConnect , node->GetName(), node->GetPNid() , idst, socks_[idst] ); } + + --currentNodes_; + // Clear bit in set of "up nodes" + upNodes_.upNodes[idst/MAX_NODE_BITMASK] &= ~(1ull << (idst%MAX_NODE_BITMASK)); + // Remove old socket from epoll set, it may not be there struct epoll_event event; event.data.fd = socks_[idst]; @@ -6024,13 +6219,14 @@ int CCluster::AllgatherSockReconnect( MPI_Status *stats, bool reestablishConnect } if ( idst >= 0 && reconnectSock != -1 + && socks_[idst] != -1 && fcntl( socks_[idst], F_SETFL, O_NONBLOCK ) ) { err = MPI_ERR_AMODE; char ebuff[256]; char buf[MON_STRING_BUF_SIZE]; - snprintf( buf, sizeof(buf), "[%s@%d] fcntl(NONBLOCK) error: %s\n", - method_name, __LINE__, strerror_r( errno, ebuff, 256 ) ); + snprintf( buf, sizeof(buf), "[%s@%d] fcntl(socks_[%d]=%d,F_SETFL,NONBLOCK) error: %s\n", + method_name, __LINE__,idst, socks_[idst], strerror_r( errno, ebuff, 256 ) ); mon_log_write( MON_CLUSTER_ALLGATHERSOCKRECONN_1, SQ_LOG_CRIT, buf ); } } @@ -6704,7 +6900,7 @@ void CCluster::HandleDownNode( int pnid ) trace_printf("%s@%d - Added down node to list, pnid=%d, name=(%s)\n", method_name, __LINE__, downNode->GetPNid(), downNode->GetName()); // assign new leaders if needed - AssignLeaders(pnid, false); + AssignLeaders( pnid, downNode->GetName(), false ); // Build available list of spare nodes CNode *spareNode; @@ -7410,20 +7606,50 @@ bool CCluster::checkIfDone ( ) const char method_name[] = "CCluster::checkIfDone"; TRACE_ENTRY; -#ifdef NAMESERVER_PROCESS int nameServerCount = 0; CClusterConfig *clusterConfig = Nodes->GetClusterConfig(); - CNameServerConfigContainer *nameServerConfig = NULL; + CNameServerConfigContainer *nameServerConfigContainer = NULL; + +#ifdef NAMESERVER_PROCESS if (clusterConfig) { - nameServerConfig = Nodes->GetNameServerConfig(); - if (nameServerConfig) + nameServerConfigContainer = Nodes->GetNameServerConfig(); + if (nameServerConfigContainer) + { + nameServerCount = nameServerConfigContainer->GetCount(); + } + } +#else + int myNameServerCount = 0; + CNameServerConfig *nameServerConfig = NULL; + + if (NameServerEnabled && clusterConfig) + { + nameServerConfigContainer = Nodes->GetNameServerConfig(); + if (nameServerConfigContainer) { - nameServerCount = nameServerConfig->GetCount(); + nameServerCount = nameServerConfigContainer->GetCount(); + if (IsRealCluster) + { + nameServerConfig = nameServerConfigContainer->GetConfig( Node_name ); + if (nameServerConfig) + { + myNameServerCount = 1; + } + } + else + { + if (nameServerCount && MyPNID < nameServerCount) + { + myNameServerCount = 1; + } + } } } +#endif +#ifdef NAMESERVER_PROCESS if (trace_settings & (TRACE_PROCESS | TRACE_PROCESS_DETAIL | TRACE_SYNC)) trace_printf("%s@%d - Node %d shutdown level=%d, state=%s. Process " "count=%d, internal state=%d, currentNodes_=%d, " @@ -7508,10 +7734,8 @@ bool CCluster::checkIfDone ( ) // let the watchdog process exit HealthCheck.setState(MON_EXIT_PRIMITIVES); } - else if ( (Nodes->ProcessCount() <= - (currentNodes_ * (MAX_PRIMITIVES)) ) // only Name Servers alive - && (MyNode->GetNumProcs() <= - (MAX_PRIMITIVES) ) // only Name Servers alive + else if ( (MyNode->GetNumProcs() <= // only My Name Server alive + myNameServerCount ) && !MyNode->isInQuiesceState() // post-quiescing will // expire WDG (cluster) && !waitForNameServerExit_ ) // Name Server not yet exiting @@ -8852,7 +9076,7 @@ int CCluster::AcceptSock( int sock ) return ( csock ); } -int CCluster::Connect( const char *portName ) +int CCluster::Connect( const char *portName, bool doRetries ) { const char method_name[] = "CCluster::Connect"; TRACE_ENTRY; @@ -8885,7 +9109,6 @@ int CCluster::Connect( const char *portName ) int len = colon - portName; host[len] = '\0'; port = atoi(&colon[1]); - size = sizeof(sockinfo); if ( !retries ) @@ -8937,15 +9160,29 @@ int CCluster::Connect( const char *portName ) { if (trace_settings & (TRACE_INIT | TRACE_RECOVERY)) { - trace_printf( "%s@%d - Connecting to %s addr=%d.%d.%d.%d, port=%d, connect_failures=%d\n" - , method_name, __LINE__ - , host - , (int)((unsigned char *)he->h_addr)[0] - , (int)((unsigned char *)he->h_addr)[1] - , (int)((unsigned char *)he->h_addr)[2] - , (int)((unsigned char *)he->h_addr)[3] - , port - , connect_failures ); + if (doRetries) + { + trace_printf( "%s@%d - Connecting to %s, addr=%d.%d.%d.%d, port=%d, connect_failures=%d\n" + , method_name, __LINE__ + , portName + , (int)((unsigned char *)he->h_addr)[0] + , (int)((unsigned char *)he->h_addr)[1] + , (int)((unsigned char *)he->h_addr)[2] + , (int)((unsigned char *)he->h_addr)[3] + , port + , connect_failures ); + } + else + { + trace_printf( "%s@%d - Connecting to %s, addr=%d.%d.%d.%d, port=%d\n" + , method_name, __LINE__ + , portName + , (int)((unsigned char *)he->h_addr)[0] + , (int)((unsigned char *)he->h_addr)[1] + , (int)((unsigned char *)he->h_addr)[2] + , (int)((unsigned char *)he->h_addr)[3] + , port ); + } } ret = connect( sock, (struct sockaddr *) &sockinfo, size ); @@ -8965,8 +9202,8 @@ int CCluster::Connect( const char *portName ) { char la_buf[MON_STRING_BUF_SIZE]; int err = errno; - sprintf( la_buf, "[%s], connect() failed! errno=%d (%s)\n" - , method_name, err, strerror( err )); + sprintf( la_buf, "[%s], connect(%s) failed! errno=%d (%s)\n" + , method_name, portName, err, strerror( err )); mon_log_write(MON_CLUSTER_CONNECT_3, SQ_LOG_ERR, la_buf); close(sock); return ( -1 ); @@ -8975,6 +9212,12 @@ int CCluster::Connect( const char *portName ) if ( ret == 0 ) break; + if (doRetries == false) + { + close( sock ); + return( -1 ); + } + // For large clusters, the connect/accept calls seem to fail occasionally, // no doubt do to the large number (1000's) of simultaneous connect packets // flooding the network at once. So, we retry up to HPMP_CONNECT_RETRIES @@ -8984,8 +9227,8 @@ int CCluster::Connect( const char *portName ) if ( ++outer_failures > retries ) { char la_buf[MON_STRING_BUF_SIZE]; - sprintf( la_buf, "[%s], connect() exceeded retries! count=%d\n" - , method_name, retries); + sprintf( la_buf, "[%s], connect(%s) exceeded retries! count=%d\n" + , method_name, portName, retries); mon_log_write(MON_CLUSTER_CONNECT_4, SQ_LOG_ERR, la_buf); close( sock ); return ( -1 ); http://git-wip-us.apache.org/repos/asf/trafodion/blob/3931a75f/core/sqf/monitor/linux/cluster.h ---------------------------------------------------------------------- diff --git a/core/sqf/monitor/linux/cluster.h b/core/sqf/monitor/linux/cluster.h index 49939ac..9d0925f 100644 --- a/core/sqf/monitor/linux/cluster.h +++ b/core/sqf/monitor/linux/cluster.h @@ -111,7 +111,7 @@ public: #else int AcceptPtPSock( void ); #endif - int Connect( const char *portName ); + int Connect( const char *portName, bool doRetries = true ); void Connect( int socketPort ); #ifdef NAMESERVER_PROCESS void ConnectToMon2NsCommSelf( void ); @@ -127,11 +127,12 @@ public: #ifndef USE_BARRIER void ArmWakeUpSignal (void); #endif - void AssignLeaders( int pnid, bool checkProcess ); + void AssignLeaders( int pnid, const char *failedMaster, bool checkProcess ); #ifndef NAMESERVER_PROCESS void AssignTmLeader( int pnid, bool checkProcess ); #endif - void AssignMonitorLeader( int pnid ); + void AssignMonitorLeader( const char* failedMaster ); + void UpdateMonitorPort (const char* newMaster); void stats(); void CompleteSyncCycle() { syncCycle_.lock(); syncCycle_.wait(); syncCycle_.unlock(); } @@ -146,8 +147,6 @@ public: inline int GetTmLeader( void ) { return( tmLeaderNid_ ); } inline void SetTmLeader( int tmLeaderNid ) { tmLeaderNid_ = tmLeaderNid; } #endif - inline int GetMonitorLeader( void ) { return( monitorLeaderPNid_); } - inline void SetMonitorLeader( int monitorLeaderPNid ) { monitorLeaderPNid_ = monitorLeaderPNid; } int GetDownedNid( void ); #ifndef NAMESERVER_PROCESS inline int GetTmSyncPNid( void ) { return( tmSyncPNid_ ); } // Physical Node ID of current TmSync operations master @@ -196,7 +195,9 @@ public: void NodeTmReady( int nid ); #endif bool isMonSyncResponsive() { return monSyncResponsive_; } +#ifdef EXCHANGE_CPU_SCHEDULING_DATA void SaveSchedData( struct internal_msg_def *recv_msg ); +#endif bool IsNodeDownDeathNotices() { return nodeDownDeathNotices_; } int ReceiveMPI(char *buf, int size, int source, MonXChngTags tag, MPI_Comm comm); @@ -275,7 +276,6 @@ private: int configPNodesCount_; // # of physical nodes configured int configPNodesMax_; // max # of physical nodes that can be configured int *nodeMap_; // Mapping of Node ranks to COMM_WORLD ranks - int monitorLeaderPNid_; // PNid of currently assigned Monitor leader node #ifndef NAMESERVER_PROCESS int tmLeaderNid_; // Nid of currently assigned TM Leader node int tmReadyCount_; // # of DTM processes ready for transactions http://git-wip-us.apache.org/repos/asf/trafodion/blob/3931a75f/core/sqf/monitor/linux/commaccept.cxx ---------------------------------------------------------------------- diff --git a/core/sqf/monitor/linux/commaccept.cxx b/core/sqf/monitor/linux/commaccept.cxx index 8069653..5c4e3a5 100644 --- a/core/sqf/monitor/linux/commaccept.cxx +++ b/core/sqf/monitor/linux/commaccept.cxx @@ -664,9 +664,9 @@ void CCommAccept::processNewSock( int joinFd ) char buf[MON_STRING_BUF_SIZE]; snprintf( buf, sizeof(buf) - , "[%s], got connection from node %s. " + , "[%s], got connection from node %s (pnid=%d). " "Node not down, node state=%s\n" - , method_name, nodeId.nodeName + , method_name, nodeId.nodeName, nodeId.pnid , StateString(node->GetState())); mon_log_write(MON_COMMACCEPT_10, SQ_LOG_ERR, buf); http://git-wip-us.apache.org/repos/asf/trafodion/blob/3931a75f/core/sqf/monitor/linux/healthcheck.cxx ---------------------------------------------------------------------- diff --git a/core/sqf/monitor/linux/healthcheck.cxx b/core/sqf/monitor/linux/healthcheck.cxx index d72b99c..1f344fc 100644 --- a/core/sqf/monitor/linux/healthcheck.cxx +++ b/core/sqf/monitor/linux/healthcheck.cxx @@ -115,6 +115,18 @@ CHealthCheck::CHealthCheck() quiesceTimeoutSec_ = atoi(quiesceTimeoutC); } +#ifdef NAMESERVER_PROCESS + cpuSchedulingDataEnabled_ = false; +#else + cpuSchedulingDataEnabled_ = true; + char *env; + env = getenv("SQ_CPUSCHEDULINGDATA_ENABLED"); + if ( env && isdigit(*env) ) + { + cpuSchedulingDataEnabled_ = atoi(env); + } +#endif + if (trace_settings & TRACE_HEALTH) trace_printf("%s@%d quiesceTimeoutSec_ = %d, syncTimeoutSec_ = %d, workerTimeoutSec_ = %d\n", method_name, __LINE__, quiesceTimeoutSec_, CMonitor::SYNC_MAX_RESPONSIVE, CReqQueue::REQ_MAX_RESPONSIVE); @@ -242,9 +254,16 @@ void CHealthCheck::healthCheckThread() mem_log_write(MON_HEALTHCHECK_WAKEUP_2, (int)(currTime_.tv_sec - wakeupTimeSaved_)); } - // Replicate the clone to other nodes - CReplSchedData *repl = new CReplSchedData(); - Replicator.addItem(repl); +#ifndef NAMESERVER_PROCESS +#ifdef EXCHANGE_CPU_SCHEDULING_DATA + if (cpuSchedulingDataEnabled_) + { + // Replicate this host's CPU scheduling data to other nodes + CReplSchedData *repl = new CReplSchedData(); + Replicator.addItem(repl); + } +#endif +#endif state = state_; http://git-wip-us.apache.org/repos/asf/trafodion/blob/3931a75f/core/sqf/monitor/linux/healthcheck.h ---------------------------------------------------------------------- diff --git a/core/sqf/monitor/linux/healthcheck.h b/core/sqf/monitor/linux/healthcheck.h index c8e15fe..e637400 100644 --- a/core/sqf/monitor/linux/healthcheck.h +++ b/core/sqf/monitor/linux/healthcheck.h @@ -107,6 +107,7 @@ private: bool checkReqResponsive_; // should req thread be checked for responsiveness or not int monSyncTimeout_; // timeout (in secs) for sync thread responsiveness int refreshCounter_; // monitor heartbeats, updated every second. + bool cpuSchedulingDataEnabled_; // monitors exchange CPU scheduling data when enabled }; #endif http://git-wip-us.apache.org/repos/asf/trafodion/blob/3931a75f/core/sqf/monitor/linux/internal.h ---------------------------------------------------------------------- diff --git a/core/sqf/monitor/linux/internal.h b/core/sqf/monitor/linux/internal.h index 6d28b29..6329fc7 100644 --- a/core/sqf/monitor/linux/internal.h +++ b/core/sqf/monitor/linux/internal.h @@ -209,7 +209,8 @@ typedef struct { int nid; // Nid id of process to receive io data int pid; // Process id of process to receive io data - StdIoType ioType; + Verifier_t verifier; // Verifier of the process to receive io data + StdIoType ioType; int length; // Length in byte of Data buffer used char data[MAX_SYNC_DATA]; // Data to be replicated across cluster } ioData_t; @@ -225,9 +226,10 @@ struct stdin_req_def { int nid; // Node id of process requesting stdin data int pid; // Process id of process requesting stdin data + Verifier_t verifier; StdinReqType reqType; int supplier_nid; // Node id of process supplying stdin data - int supplier_pid; // Process id of process to supplying stdin data + int supplier_pid; // Process id of process to supplying stdin data }; struct kill_def @@ -375,6 +377,7 @@ typedef struct long long cpu_soft_irq; // Time in software interrupt } ProcStat_t; +#ifdef EXCHANGE_CPU_SCHEDULING_DATA struct scheddata_def { int PNid; // Node ID of scheduling data @@ -391,6 +394,7 @@ struct scheddata_def ProcStat_t proc_stats[MAX_LNODES_PER_NODE]; // Per logical node processor statistics unsigned int btime; // Node boot time }; +#endif struct set_def @@ -468,7 +472,9 @@ struct internal_msg_def struct spare_def activate_spare; struct uniqstr_def uniqstr; struct shutdown_def shutdown; +#ifdef EXCHANGE_CPU_SCHEDULING_DATA struct scheddata_def scheddata; +#endif char buffer[MAX_SYNC_SIZE-MSG_HDR_SIZE]; // Limit entire buffer to MAX_SYNC_SIZE } u; } __attribute__((__may_alias__)); http://git-wip-us.apache.org/repos/asf/trafodion/blob/3931a75f/core/sqf/monitor/linux/monitor.cxx ---------------------------------------------------------------------- diff --git a/core/sqf/monitor/linux/monitor.cxx b/core/sqf/monitor/linux/monitor.cxx index 933dee5..17ed27d 100755 --- a/core/sqf/monitor/linux/monitor.cxx +++ b/core/sqf/monitor/linux/monitor.cxx @@ -742,37 +742,42 @@ int CMonitor::PackProcObjs( char *&buffer ) char *bufPtr = buffer; - // first copy all primary and generic processes - lnode = Nodes->GetFirstLNode(); - for ( ; lnode ; lnode = lnode->GetNext() ) +#ifndef NAMESERVER_PROCESS + if (!NameServerEnabled) +#endif { - process = lnode->GetFirstProcess(); - while (process) + // first copy all primary and generic processes + lnode = Nodes->GetFirstLNode(); + for ( ; lnode ; lnode = lnode->GetNext() ) { - if (!process->IsBackup()) + process = lnode->GetFirstProcess(); + while (process) { - buffer = ProcCopy(buffer, process); - ++procCount; + if (!process->IsBackup()) + { + buffer = ProcCopy(buffer, process); + ++procCount; + } + + process = process->GetNext(); } - - process = process->GetNext(); } - } - - // copy all the backup processes - lnode = Nodes->GetFirstLNode(); - for ( ; lnode ; lnode = lnode->GetNext() ) - { - process = lnode->GetFirstProcess(); - while (process) + + // copy all the backup processes + lnode = Nodes->GetFirstLNode(); + for ( ; lnode ; lnode = lnode->GetNext() ) { - if (process->IsBackup()) + process = lnode->GetFirstProcess(); + while (process) { - buffer = ProcCopy(buffer, process); - ++procCount; + if (process->IsBackup()) + { + buffer = ProcCopy(buffer, process); + ++procCount; + } + + process = process->GetNext(); } - - process = process->GetNext(); } } @@ -927,6 +932,24 @@ void CMonitor::StartPrimitiveProcesses( void ) } #endif +void HandleAssignMonitorLeader ( const char *failedMaster ) +{ + const char method_name[] = "HandleAssignMonitorLeader"; + TRACE_ENTRY; + if (trace_settings & (TRACE_INIT | TRACE_RECOVERY)) + { + trace_printf("%s@%d HandleAssignMonitorLeader called for %s\n" + , method_name, __LINE__, failedMaster); + } + // only relevant in AgentMode + if (IsAgentMode) + { + Monitor->AssignMonitorLeader(failedMaster); + } + + TRACE_EXIT; +} + void HandleMyNodeExpiration( void ) { const char method_name[] = "HandleMyNodeExpiration"; @@ -1126,7 +1149,9 @@ int main (int argc, char *argv[]) char temp_fname[MAX_PROCESS_PATH]; #endif char buf[MON_STRING_BUF_SIZE]; +#ifndef NAMESERVER_PROCESS unsigned int initSleepTime = 1; // 1 second +#endif mallopt(M_ARENA_MAX, 4); // call to limit the number of arena's of monitor to 4.This call doesn't seem to have any effect ! @@ -1337,11 +1362,13 @@ int main (int argc, char *argv[]) genSnmpTrapEnabled = true; } +#ifndef NAMESERVER_PROCESS env = getenv("MON_INIT_SLEEP"); if ( env && isdigit(*env) ) { initSleepTime = atoi(env); } +#endif env = getenv("SQ_COLD_STANDBY_SPARE"); if ( env && isdigit(*env) ) @@ -1527,6 +1554,10 @@ int main (int argc, char *argv[]) if (IsAgentMode) { + if ( IsRealCluster ) + { + MyPNID = -1; + } CreatorShellPid = 1000; // per monitor.sh CreatorShellVerifier = 0; } @@ -1610,6 +1641,7 @@ int main (int argc, char *argv[]) } } } + if (trace_settings & TRACE_INIT) { trace_printf("%s@%d Using signal %d for normal processes " @@ -1691,6 +1723,12 @@ int main (int argc, char *argv[]) abort(); } + //Moved creation of the below to later on + //Nodes = new CNodeContainer (); + //Config = new CConfigContainer (); + //Monitor = new CMonitor (procTermSig); + + // Set up zookeeper and determine the master if ( IsAgentMode || IsRealCluster ) { @@ -1728,6 +1766,16 @@ int main (int argc, char *argv[]) if (masterMonitor) { strcpy (MasterMonitorName, masterMonitor); + + if (trace_settings & TRACE_INIT) + { + trace_printf("%s@%d (MasterMonitor) IsAgentMode = TRUE, masterMonitor from ZK: %s, Node_name: %s\n" + , method_name + , __LINE__ + , MasterMonitorName + , Node_name); + } + // unfortunately, we have to do this to see if we are the master before // other things are set up. This is how we must do that if (strcmp(Node_name, masterMonitor) == 0) @@ -1742,6 +1790,16 @@ int main (int argc, char *argv[]) else { strcpy (MasterMonitorName, ClusterConfig->GetConfigMasterByName()); + + if (trace_settings & TRACE_INIT) + { + trace_printf("%s@%d (MasterMonitor) IsAgentMode = TRUE, ConfigMasterMonitor: %s, Node_name:%s \n" + , method_name + , __LINE__ + , MasterMonitorName + , Node_name); + } + if (strcmp (Node_name, ClusterConfig->GetConfigMasterByName()) == 0) { IsMaster = true; @@ -1751,12 +1809,21 @@ int main (int argc, char *argv[]) IsMaster = false; } } - } #ifdef NAMESERVER_PROCESS else { strcpy (MasterMonitorName, ClusterConfig->GetConfigMasterByName()); + + if (trace_settings & TRACE_INIT) + { + trace_printf("%s@%d (MasterMonitor) IsAgentMode = TRUE, ConfigMasterMonitor: %s, Node_name:%s \n" + , method_name + , __LINE__ + , MasterMonitorName + , Node_name); + } + if ( IsRealCluster ) { if (strcmp (Node_name, ClusterConfig->GetConfigMasterByName()) == 0) @@ -1774,7 +1841,6 @@ int main (int argc, char *argv[]) } } #endif - } if (IsAgentMode) @@ -1783,7 +1849,9 @@ int main (int argc, char *argv[]) { #ifdef NAMESERVER_PROCESS if ( IsRealCluster ) + { MyPNID = -1; + } #else MyPNID = -1; #endif @@ -1859,6 +1927,7 @@ int main (int argc, char *argv[]) if (IsAgentMode) { + int monitorLead = -1; CNode *myNode = Nodes->GetNode(MyPNID); const char *masterMonitor = NULL; if (myNode == NULL) @@ -1894,11 +1963,13 @@ int main (int argc, char *argv[]) trace_printf("%s@%d (MasterMonitor) IsMaster=%d, masterNode=%s\n", method_name, __LINE__, IsMaster, masterNode->GetName() ); } } - Monitor->SetMonitorLeader( masterNode->GetPNid() ); - if (MyPNID == masterNode->GetPNid()) + monitorLead = masterNode->GetPNid(); + if (MyPNID == monitorLead) { + ZClient->WatchNodeMasterDelete (myNode->GetName() ); // just in case of stale info ZClient->CreateMasterZNode ( myNode->GetName() ); strcpy (MasterMonitorName, myNode->GetName()); + ZClient->WatchMasterNode( MasterMonitorName ); if (trace_settings & TRACE_INIT) { trace_printf("%s@%d (MasterMonitor) IsMaster=%d, set monitor lead to %d\n", method_name, __LINE__, IsMaster, MyPNID); @@ -1920,7 +1991,8 @@ int main (int argc, char *argv[]) { trace_printf("%s@%d (MasterMonitor) IsMaster=%d, set monitor lead to %d\n", method_name, __LINE__, IsMaster, masterNode->GetPNid()); } - Monitor->SetMonitorLeader( masterNode->GetPNid() ); + monitorLead = masterNode->GetPNid(); + ZClient->WatchMasterNode( MasterMonitorName ); } else { @@ -1936,6 +2008,20 @@ int main (int argc, char *argv[]) } } } +#ifdef NAMESERVER_PROCESS + else + { + if ( !IsRealCluster ) + { + monitorLead = 0; + } + } +#endif + char buf[MON_STRING_BUF_SIZE]; + snprintf( buf, sizeof(buf) + , "[%s], Master Monitor is on node %d\n" + , method_name, monitorLead); + mon_log_write(MON_MONITOR_MAIN_18, SQ_LOG_INFO, buf); } if (!IAmIntegrating) { @@ -2158,6 +2244,7 @@ int main (int argc, char *argv[]) // Programmer bonehead! abort(); } + if (port != NULL) { int myPortNum; http://git-wip-us.apache.org/repos/asf/trafodion/blob/3931a75f/core/sqf/monitor/linux/msgdef.h ---------------------------------------------------------------------- diff --git a/core/sqf/monitor/linux/msgdef.h b/core/sqf/monitor/linux/msgdef.h index 4b34557..ca76fc1 100644 --- a/core/sqf/monitor/linux/msgdef.h +++ b/core/sqf/monitor/linux/msgdef.h @@ -34,6 +34,7 @@ // Compile options //#define DEBUGGING #define NO_OPEN_CLOSE_NOTICES +#define EXCHANGE_CPU_SCHEDULING_DATA #define SERVICE_TAG 1 #define INTERNAL_TAG 2 http://git-wip-us.apache.org/repos/asf/trafodion/blob/3931a75f/core/sqf/monitor/linux/nameserver.cxx ---------------------------------------------------------------------- diff --git a/core/sqf/monitor/linux/nameserver.cxx b/core/sqf/monitor/linux/nameserver.cxx index d367085..690e23d 100644 --- a/core/sqf/monitor/linux/nameserver.cxx +++ b/core/sqf/monitor/linux/nameserver.cxx @@ -27,6 +27,7 @@ using namespace std; +#include #include #include #include @@ -41,6 +42,7 @@ using namespace std; #include #include #include +#include #include "lnode.h" #include "pnode.h" @@ -696,7 +698,12 @@ int CNameServer::SendReceive( struct message_def* msg ) size += sizeof(msg->u.request.u.process_info_cont); break; case ReqType_ProcessInfoNs: - descp = (char *) "process-info-ns"; + msginfo = &msg->u.request.u.process_info; + sprintf( desc, "process-info-ns (nid=%d, pid=%d, verifier=%d, name=%s)\n" + "\ttarget (nid=%d, pid=%d, verifier=%d, name=%s, type=%d)\n", + msginfo->nid, msginfo->pid, msginfo->verifier, msginfo->process_name, + msginfo->target_nid, msginfo->target_pid, msginfo->target_verifier, + msginfo->target_process_name, msginfo->type ); size += sizeof(msg->u.request.u.process_info); break; case ReqType_ShutdownNs: http://git-wip-us.apache.org/repos/asf/trafodion/blob/3931a75f/core/sqf/monitor/linux/nsprocess.cxx ---------------------------------------------------------------------- diff --git a/core/sqf/monitor/linux/nsprocess.cxx b/core/sqf/monitor/linux/nsprocess.cxx index 62523ae..dd2ab68 100644 --- a/core/sqf/monitor/linux/nsprocess.cxx +++ b/core/sqf/monitor/linux/nsprocess.cxx @@ -44,13 +44,15 @@ CProcess *CProcessContainer::CreateProcess (CProcess * parent, strId_t programStrId, char *infile, char *outfile, + void *tag, int &result) { - CProcess *process = NULL; - const char method_name[] = "CProcessContainer::CreateProcess"; TRACE_ENTRY; + CProcess *process = NULL; + + tag = tag; // Satisfy compiler result = MPI_SUCCESS; // load & normalize process name http://git-wip-us.apache.org/repos/asf/trafodion/blob/3931a75f/core/sqf/monitor/linux/nsreqnewproc.cxx ---------------------------------------------------------------------- diff --git a/core/sqf/monitor/linux/nsreqnewproc.cxx b/core/sqf/monitor/linux/nsreqnewproc.cxx index 2231fc0..eb06728 100644 --- a/core/sqf/monitor/linux/nsreqnewproc.cxx +++ b/core/sqf/monitor/linux/nsreqnewproc.cxx @@ -126,6 +126,7 @@ void CExtNewProcNsReq::performRequest() msg_->u.request.u.new_process_ns.programStrId, msg_->u.request.u.new_process_ns.infile, msg_->u.request.u.new_process_ns.outfile, + 0, // tag result ); if (process) http://git-wip-us.apache.org/repos/asf/trafodion/blob/3931a75f/core/sqf/monitor/linux/nsreqqueue.cxx ---------------------------------------------------------------------- diff --git a/core/sqf/monitor/linux/nsreqqueue.cxx b/core/sqf/monitor/linux/nsreqqueue.cxx index 3d62e7a..d10a963 100644 --- a/core/sqf/monitor/linux/nsreqqueue.cxx +++ b/core/sqf/monitor/linux/nsreqqueue.cxx @@ -36,7 +36,7 @@ void CRequest::monreply(struct message_def *msg, int sockFd, int *error) *error = 0; if (!msg->noreply) // send reply { - int size = offsetof(message_def, u.reply.u); + int size = offsetof(struct message_def, u.reply.u); switch (msg->u.reply.type) { case ReplyType_Generic: http://git-wip-us.apache.org/repos/asf/trafodion/blob/3931a75f/core/sqf/monitor/linux/pnode.cxx ---------------------------------------------------------------------- diff --git a/core/sqf/monitor/linux/pnode.cxx b/core/sqf/monitor/linux/pnode.cxx index 033a19a..563cf01 100644 --- a/core/sqf/monitor/linux/pnode.cxx +++ b/core/sqf/monitor/linux/pnode.cxx @@ -78,7 +78,6 @@ extern CHealthCheck HealthCheck; extern CMonTrace *MonTrace; extern bool IsAgentMode; extern bool IAmIntegrating; -extern char MasterMonitorName[MAX_PROCESS_PATH]; extern char Node_name[MPI_MAX_PROCESSOR_NAME]; extern CClusterConfig *ClusterConfig; @@ -115,8 +114,8 @@ extern CNameServerConfigContainer *NameServerConfig; // The following defines specify the default values for the HA // timers if the timer related environment variables are not defined. -// Defaults to 5 second Watchdog process timer expiration -#define WDT_KeepAliveTimerDefault 5 +// Defaults to 60 second Watchdog process timer expiration +#define WDT_KeepAliveTimerDefault 60 // Default interval used by GetSchedulingData (in milliseconds) unsigned long int CNode::minSchedDataInterval_ = 500; @@ -1251,6 +1250,7 @@ void CNode::StartNameServerProcess( void ) programStrId, (char *) "", //infile, stdout, //outfile, + 0, //tag result ); if ( NameServerProcess ) @@ -1331,6 +1331,7 @@ void CNode::StartWatchdogProcess( void ) programStrId, (char *) "", //infile, stdout, //outfile, + 0, //tag result ); if ( watchdogProcess ) @@ -1388,6 +1389,7 @@ void CNode::StartPStartDProcess( void ) programStrId, (char *) "", //infile, stdout, //outfile, + 0, //tag result ); if ( pstartdProcess ) @@ -1564,6 +1566,7 @@ void CNode::StartSMServiceProcess( void ) programStrId, (char *) "", //infile, stdout, //outfile, + 0, //tag result ); if ( smsProcess ) @@ -1652,6 +1655,7 @@ CNodeContainer::~CNodeContainer( void ) if (clusterConfig_) { delete clusterConfig_; + clusterConfig_ = NULL; } if (nameServerConfig_) { @@ -3202,6 +3206,31 @@ CProcess *CNodeContainer::GetProcessLByTypeNs( int nid, PROCESSTYPE type ) processInfo->target_process_pattern[0] = 0; processInfo->type = type; + if ( trace_settings & ( TRACE_PROCESS | TRACE_REQUEST) ) + { + trace_printf( "%s@%d - Received monitor request process-info-ns data.\n" + " process_info.nid=%d\n" + " process_info.pid=%d\n" + " process_info.verifier=%d\n" + " process_info.target_nid=%d\n" + " process_info.target_pid=%d\n" + " process_info.target_verifier=%d\n" + " process_info.target_process_name=%s\n" + " process_info.target_process_pattern=%s\n" + " process_info.type=%d\n" + , method_name, __LINE__ + , processInfo->nid + , processInfo->pid + , processInfo->verifier + , processInfo->target_nid + , processInfo->target_pid + , processInfo->target_verifier + , processInfo->target_process_name + , processInfo->target_process_pattern + , processInfo->type + ); + } + int error = NameServer->ProcessInfoNs(&msg); // in reqQueue thread (CExternalReq) if (error == 0) { http://git-wip-us.apache.org/repos/asf/trafodion/blob/3931a75f/core/sqf/monitor/linux/process.cxx ---------------------------------------------------------------------- diff --git a/core/sqf/monitor/linux/process.cxx b/core/sqf/monitor/linux/process.cxx index 5ecaf50..9b2be4f 100644 --- a/core/sqf/monitor/linux/process.cxx +++ b/core/sqf/monitor/linux/process.cxx @@ -586,8 +586,8 @@ void CProcess::procExitNotifierNodes( void ) const char method_name[] = "CProcess::procExitNotifierNodes"; TRACE_ENTRY; - CLNode *targetLNode; - CNode *targetNode; + CLNode *targetLNode = NULL; + CNode *targetNode = NULL; nidSet_t::iterator it; // Remove death notice registration for all entries on list @@ -1545,7 +1545,7 @@ void CProcess::setEnvFromRegistry ( char **envp, int &countEnv ) } #ifndef NAMESERVER_PROCESS -bool CProcess::Create (CProcess *parent, int & result) +bool CProcess::Create (CProcess *parent, void* tag, int & result) { bool monAltLogEnabled = false; bool seamonsterEnabled = false; @@ -2276,6 +2276,28 @@ bool CProcess::Create (CProcess *parent, int & result) // I'm the monitor ... connect to child rc = MPI_SUCCESS; + // Save process id and build process name if not already named + Pid = os_pid; + if (Name[0] == '\0') + { // No name assigned to the process so generate one based on + // the node-id and process-id. + MyNode->BuildOurName(Nid, os_pid, Name); + + if (trace_settings & (TRACE_SYNC | TRACE_REQUEST | TRACE_PROCESS)) + trace_printf("%s@%d - No process name specified, generated name=%s for process (%d, %d)\n", method_name, __LINE__, Name, Nid, os_pid); + } + + if (NameServerEnabled && tag != NULL) + { + // Send actual pid and process name back to parent + // STDIO Redirection requires that clone process in parent node + // have the actual pid + PtpClient->ProcessInit( this + , tag + , 0 + , parent->Nid ); + } + if (trace_settings & (TRACE_PROCESS | TRACE_REDIRECTION)) trace_printf("%s@%d Process=%s, Infile=[%s], Outfile=[%s]\n", method_name, __LINE__, Name, infile_.c_str(), @@ -2342,14 +2364,6 @@ bool CProcess::Create (CProcess *parent, int & result) fd_stdout_ = pfds_stdout[0]; } - if (Name[0] == '\0') - { // No name assigned to the process so generate one based on - // the node-id and process-id. - MyNode->BuildOurName(Nid, os_pid, Name); - - if (trace_settings & (TRACE_SYNC | TRACE_REQUEST | TRACE_PROCESS)) - trace_printf("%s@%d - No process name specified, generated name=%s for process (%d, %d)\n", method_name, __LINE__, Name, Nid, os_pid); - } // stderr pipe to child: // We don't need write end of pipe. // Add the read end of file descriptor to list of file @@ -2579,7 +2593,6 @@ bool CProcess::Create (CProcess *parent, int & result) if (rc == MPI_SUCCESS && result == MPI_SUCCESS) { successful = true; - Pid = os_pid; PidAtFork_ = os_pid; // Indicate that process exists but has not yet completed initialization. @@ -2939,28 +2952,47 @@ void CProcess::Exit( CProcess *parent ) SetState(State_Stopped); - if (parent && NameServerEnabled) + if (!Clone && parent && NameServerEnabled) { - ProcessInfoNs_reply_def processInfo; - int rc = Nodes->GetProcessInfoNs( parent->GetNid() - , parent->GetPid() - , parent->GetVerifier() - , &processInfo); - if (rc == MPI_ERR_NAME) - { - if (trace_settings & (TRACE_INIT | TRACE_RECOVERY | TRACE_REQUEST | TRACE_SYNC | TRACE_TMSYNC)) + if (parent->GetNid() != GetNid()) + { // parent is remote + if (parent->childCount() == 0) + { // process is parent's last child + if (trace_settings & (TRACE_INIT | TRACE_RECOVERY | TRACE_REQUEST | TRACE_SYNC | TRACE_TMSYNC)) + { + trace_printf( "%s@%d - Parent's last child, deleting clone process %s, (%d,%d:%d)\n" + , method_name, __LINE__ + , parent->GetName() + , parent->GetNid() + , parent->GetPid() + , parent->GetVerifier() ); + } + Nodes->DeleteCloneProcess( parent ); + parent = NULL; + } + else { - trace_printf( "%s@%d - Deleting clone process %s, (%d,%d:%d)\n" - , method_name, __LINE__ - , parent->GetName() - , parent->GetNid() - , parent->GetPid() - , parent->GetVerifier() ); + ProcessInfoNs_reply_def processInfo; + int rc = Nodes->GetProcessInfoNs( parent->GetNid() + , parent->GetPid() + , parent->GetVerifier() + , &processInfo); + if (rc == MPI_ERR_NAME) + { // parent exited + if (trace_settings & (TRACE_INIT | TRACE_RECOVERY | TRACE_REQUEST | TRACE_SYNC | TRACE_TMSYNC)) + { + trace_printf( "%s@%d - Deleting clone process %s, (%d,%d:%d)\n" + , method_name, __LINE__ + , parent->GetName() + , parent->GetNid() + , parent->GetPid() + , parent->GetVerifier() ); + } + Nodes->DeleteCloneProcess( parent ); + parent = NULL; + } } - Nodes->DeleteCloneProcess( parent ); - parent = NULL; } - } // if the env is set to not deliver death messages upon node down, @@ -4624,7 +4656,8 @@ void CProcessContainer::Child_Exit ( CProcess * parent ) { if (NameServerEnabled) { - CNode *childNode = childNode->GetNode(); + CNode *childNode = NULL; + childNode = childNode->GetNode(); // Forward the process create to the target node PtpClient->ProcessKill( process , process->GetAbort() @@ -4883,6 +4916,7 @@ CProcess *CProcessContainer::CreateProcess (CProcess * parent, strId_t programStrId, char *infile, char *outfile, + void *tag, int &result) { CProcess *process = NULL; @@ -4954,7 +4988,7 @@ CProcess *CProcessContainer::CreateProcess (CProcess * parent, { process->userArgs ( monitorArgc, monitorArgv ); } - if (process->Create (parent, result)) // monitor + if (process->Create (parent, tag, result)) // monitor { AddToPidMap(process->GetPid(), process); } @@ -6356,7 +6390,7 @@ bool CProcessContainer::RestartPersistentProcess( CProcess *process, int downNid process->SetPriorPid( !MyNode->IsSpareNode() ? process->GetPid() : 0 ); process->SetClone( false ); int result; - successful = process->Create(parent, result); + successful = process->Create(parent, 0, result); if (successful) { process->SetAbended( false ); http://git-wip-us.apache.org/repos/asf/trafodion/blob/3931a75f/core/sqf/monitor/linux/process.h ---------------------------------------------------------------------- diff --git a/core/sqf/monitor/linux/process.h b/core/sqf/monitor/linux/process.h index f6f22cb..227836f 100644 --- a/core/sqf/monitor/linux/process.h +++ b/core/sqf/monitor/linux/process.h @@ -123,6 +123,7 @@ class CProcessContainer strId_t programStrId, char *infile, char *outfile + , void *tag , int & result ); bool Dump_Process( CProcess *dumper, CProcess *process, char *core_path ); @@ -237,7 +238,7 @@ class CProcess void CompleteDump(DUMPSTATUS status, char *core_file); void CompleteProcessStartup( char *port, int os_pid, bool event_messages, bool system_messages, bool preclone, struct timespec *creation_time, int origPNidNs ); void CompleteRequest( int status ); - bool Create (CProcess *parent, int & result); + bool Create (CProcess *parent, void* tag, int & result); bool Dump (CProcess *dumper, char *core_path); void DumpBegin(int nid, int pid, Verifier_t verifier, char *core_path); void DumpEnd(DUMPSTATUS status, char *core_file); http://git-wip-us.apache.org/repos/asf/trafodion/blob/3931a75f/core/sqf/monitor/linux/ptpclient.cxx ---------------------------------------------------------------------- diff --git a/core/sqf/monitor/linux/ptpclient.cxx b/core/sqf/monitor/linux/ptpclient.cxx index 03ca514..a88e2d2 100644 --- a/core/sqf/monitor/linux/ptpclient.cxx +++ b/core/sqf/monitor/linux/ptpclient.cxx @@ -27,6 +27,7 @@ using namespace std; +#include #include #include #include @@ -41,6 +42,7 @@ using namespace std; #include #include #include +#include #include "lnode.h" #include "pnode.h" @@ -195,7 +197,7 @@ int CPtpClient::ProcessClone( CProcess *process ) { if (trace_settings & (TRACE_REQUEST | TRACE_PROCESS)) { - trace_printf( "%s@%d - Not Sending InternalType_Clone request to parentNid=%d\n" + trace_printf( "%s@%d - Not Sending InternalType_Clone request to parentNid=%d" ", process=%s (%d:%d:%d)\n" , method_name, __LINE__ , process->GetParentNid() @@ -209,7 +211,7 @@ int CPtpClient::ProcessClone( CProcess *process ) if (trace_settings & (TRACE_REQUEST | TRACE_PROCESS)) { - trace_printf( "%s@%d - Sending InternalType_Clone request to %s, parentNid=%d\n" + trace_printf( "%s@%d - Sending InternalType_Clone request to %s, parentNid=%d" ", process=%s (%d:%d:%d)\n" , method_name, __LINE__ , parentLNode->GetNode()->GetName() @@ -397,7 +399,7 @@ int CPtpClient::ProcessInit( CProcess *process { if (trace_settings & (TRACE_REQUEST | TRACE_PROCESS)) { - trace_printf( "%s@%d - Not Sending InternalType_Clone request to parentNid=%d" + trace_printf( "%s@%d - Not Sending InternalType_ProcessInit request to parentNid=%d" ", process=%s (%d,%d:%d)\n" , method_name, __LINE__ , process->GetParentNid() @@ -412,14 +414,16 @@ int CPtpClient::ProcessInit( CProcess *process if (trace_settings & (TRACE_REQUEST | TRACE_PROCESS)) { trace_printf( "%s@%d" " - Sending InternalType_ProcessInit to parent node %s, parentNid=%d" - ", for process %s (%d,%d:%d)\n" + ", for process %s (%d,%d:%d), result=%d, tag=%p\n" , method_name, __LINE__ , parentLNode->GetNode()->GetName() , parentNid , process->GetName() , process->GetNid() , process->GetPid() - , process->GetVerifier() ); + , process->GetVerifier() + , result + , tag ); } struct internal_msg_def msg; @@ -923,3 +927,176 @@ int CPtpClient::SendToMon(const char *reqType, internal_msg_def *msg, int size, return error; } +int CPtpClient::StdInReq( int nid + , int pid + , StdinReqType type + , int supplierNid + , int supplierPid ) +{ + const char method_name[] = "CPtpClient::StdInReq"; + TRACE_ENTRY; + + if (trace_settings & (TRACE_REDIRECTION | TRACE_PROCESS)) + { + trace_printf( "%s@%d - Sending InternalType_StdinReq request type =%d " + "from (%d,%d), for supplier (%d,%d)\n" + , method_name, __LINE__ + , type + , nid + , pid + , supplierNid + , supplierPid ); + } + + CLNode *lnode = Nodes->GetLNode( supplierNid ); + if (lnode == NULL) + { + char buf[MON_STRING_BUF_SIZE]; + snprintf( buf, sizeof(buf) + , "[%s], Can't find supplier node nid=%d " + "for stdin data request.\n" + , method_name + , supplierNid ); + mon_log_write(PTPCLIENT_STDINREQ_1, SQ_LOG_ERR, buf); + + TRACE_EXIT; + return -1; + } + + CProcess *process = lnode->GetProcessL( supplierPid ); + if (process == NULL) + { + char buf[MON_STRING_BUF_SIZE]; + snprintf( buf, sizeof(buf) + , "[%s], Can't find process nid=%d, " + "pid=%d for stdin data request.\n" + , method_name + , supplierNid + , supplierPid ); + mon_log_write(PTPCLIENT_STDINREQ_2, SQ_LOG_ERR, buf); + + TRACE_EXIT; + return -1; + } + + struct internal_msg_def msg; + memset(&msg, 0, sizeof(msg)); + msg.type = InternalType_StdinReq; + msg.u.stdin_req.nid = nid; + msg.u.stdin_req.pid = pid; + msg.u.stdin_req.reqType = type; + msg.u.stdin_req.supplier_nid = supplierNid; + msg.u.stdin_req.supplier_pid = supplierPid; + + int size = offsetof(struct internal_msg_def, u); + size += sizeof(msg.u.stdin_req); + + if (trace_settings & (TRACE_REDIRECTION | TRACE_PROCESS_DETAIL)) + { + trace_printf( "%s@%d - size_=%d, type =%d " + "from (%d,%d), for supplier (%d,%d)\n" + , method_name, __LINE__ + , size + , msg.u.stdin_req.reqType + , msg.u.stdin_req.nid + , msg.u.stdin_req.pid + , msg.u.stdin_req.supplier_nid + , msg.u.stdin_req.supplier_pid ); + } + + int error = SendToMon("stdin" + , &msg + , size + , process->GetNid() + , lnode->GetNode()->GetName()); + + TRACE_EXIT; + return error; +} + +int CPtpClient::StdIoData( int nid + , int pid + , StdIoType type + , ssize_t count + , char *data ) +{ + const char method_name[] = "CPtpClient::StdIoData"; + TRACE_ENTRY; + + if (trace_settings & (TRACE_REDIRECTION | TRACE_PROCESS)) + { + trace_printf( "%s@%d - Sending InternalType_IoData request type =%d " + "to (%d,%d), count=%ld\n" + , method_name, __LINE__ + , type + , nid + , pid + , count ); + } + + CLNode *lnode = Nodes->GetLNode( nid ); + if (lnode == NULL) + { + char buf[MON_STRING_BUF_SIZE]; + snprintf( buf, sizeof(buf) + , "[%s], Can't find supplier node nid=%d " + "for stdin data request.\n" + , method_name + , nid ); + mon_log_write(PTPCLIENT_STDIODATA_1, SQ_LOG_ERR, buf); + + TRACE_EXIT; + return -1; + } + + CProcess *process = lnode->GetProcessL( pid ); + if (process == NULL) + { + char buf[MON_STRING_BUF_SIZE]; + snprintf( buf, sizeof(buf) + , "[%s], Can't find process nid=%d, " + "pid=%d for stdin data request.\n" + , method_name + , nid + , pid ); + mon_log_write(PTPCLIENT_STDIODATA_2, SQ_LOG_ERR, buf); + + TRACE_EXIT; + return -1; + } + + struct internal_msg_def msg; + memset(&msg, 0, sizeof(msg)); + msg.type = InternalType_IoData; + msg.u.iodata.nid = nid ; + msg.u.iodata.pid = pid ; + msg.u.iodata.ioType = type ; + msg.u.iodata.length = count; + memcpy(&msg.u.iodata.data, data, count); + + int size = offsetof(struct internal_msg_def, u); + size += sizeof(msg.u.iodata); + + if (trace_settings & (TRACE_REDIRECTION | TRACE_PROCESS_DETAIL)) + { + trace_printf( "%s@%d - size_=%d, type =%d " + "to (%d,%d), count=%d\n(%s)" + , method_name, __LINE__ + , size + , msg.u.iodata.ioType + , msg.u.iodata.nid + , msg.u.iodata.pid + , msg.u.iodata.length + , msg.u.iodata.length?msg.u.iodata.data:"\n" ); + } + + int error = SendToMon("stdio-data" + , &msg + , size + , process->GetNid() + , lnode->GetNode()->GetName()); + + TRACE_EXIT; + return error; +} +