trafodion-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sure...@apache.org
Subject [33/50] [abbrv] trafodion git commit: Scale testing fixes.
Date Sat, 16 Jun 2018 17:10:03 GMT
http://git-wip-us.apache.org/repos/asf/trafodion/blob/3931a75f/core/sqf/monitor/linux/ptpclient.h
----------------------------------------------------------------------
diff --git a/core/sqf/monitor/linux/ptpclient.h b/core/sqf/monitor/linux/ptpclient.h
index 87f2315..e6ddeb4 100644
--- a/core/sqf/monitor/linux/ptpclient.h
+++ b/core/sqf/monitor/linux/ptpclient.h
@@ -69,6 +69,16 @@ public:
                       , CProcess *targetProcess
                       , int targetNid
                       , const char *targetNodeName );
+    int  StdInReq( int nid
+                 , int pid
+                 , StdinReqType type
+                 , int supplierNid
+                 , int supplierPid );
+    int  StdIoData( int nid
+                  , int pid
+                  , StdIoType type
+                  , ssize_t count
+                  , char *data );
 
 private:
 

http://git-wip-us.apache.org/repos/asf/trafodion/blob/3931a75f/core/sqf/monitor/linux/ptpcommaccept.cxx
----------------------------------------------------------------------
diff --git a/core/sqf/monitor/linux/ptpcommaccept.cxx b/core/sqf/monitor/linux/ptpcommaccept.cxx
index fa21dc0..d380d3a 100644
--- a/core/sqf/monitor/linux/ptpcommaccept.cxx
+++ b/core/sqf/monitor/linux/ptpcommaccept.cxx
@@ -23,9 +23,10 @@
 //
 ///////////////////////////////////////////////////////////////////////////////
 
-
 using namespace std;
 
+#include <stdio.h>
+#include "redirector.h"
 #include "ptpcommaccept.h"
 #include "monlogging.h"
 #include "montrace.h"
@@ -33,11 +34,13 @@ using namespace std;
 
 #include "reqqueue.h"
 
+extern CRedirector Redirector;
 extern CReqQueue ReqQueue;
 extern CPtpCommAccept PtpCommAccept;
 extern CMonitor *Monitor;
 extern CNode *MyNode;
 extern CNodeContainer *Nodes;
+extern CRedirector Redirector;
 extern int MyPNID;
 extern char MyPtPPort[MPI_MAX_PORT_NAME];
 extern char *ErrorMsg (int error_code);
@@ -46,9 +49,9 @@ extern CommType_t CommType;
 
 
 CPtpCommAccept::CPtpCommAccept()
-           : accepting_(true)
-           , shutdown_(false)
-           , thread_id_(0)
+               : accepting_(true)
+               , shutdown_(false)
+               , thread_id_(0)
 {
     const char method_name[] = "CPtpCommAccept::CPtpCommAccept";
     TRACE_ENTRY;
@@ -82,7 +85,7 @@ void CPtpCommAccept::processNewSock( int sockFd )
         char buf[MON_STRING_BUF_SIZE];
         snprintf(buf, sizeof(buf), "[%s], unable to obtain node id from new "
                      "monitor: %s.\n", method_name, ErrorMsg(rc));
-        mon_log_write(MON_COMMACCEPT_8, SQ_LOG_ERR, buf);    
+        mon_log_write(PTP_COMMACCEPT_1, SQ_LOG_ERR, buf);    
         return;
     }
     // Get info about connecting monitor
@@ -97,7 +100,7 @@ void CPtpCommAccept::processNewSock( int sockFd )
         char buf[MON_STRING_BUF_SIZE];
         snprintf(buf, sizeof(buf), "[%s], unable to obtain node id from new "
                  "monitor: %s.\n", method_name, ErrorMsg(rc));
-        mon_log_write(MON_COMMACCEPT_8, SQ_LOG_ERR, buf);    
+        mon_log_write(PTP_COMMACCEPT_2, SQ_LOG_ERR, buf);    
         return;
     }
     else
@@ -191,6 +194,26 @@ void CPtpCommAccept::processNewSock( int sockFd )
                 ReqQueue.enqueueKillReq( &msg.u.kill );
                 break;
             }
+            case InternalType_IoData:
+            {
+                if (trace_settings & (TRACE_REDIRECTION | TRACE_PROCESS))
+                {
+                    trace_printf( "%s@%d" " - Received InternalType_IoData\n"
+                                , method_name, __LINE__ );
+                }
+                ReqQueue.enqueueIoDataReq( &msg.u.iodata );
+                break;
+            }
+            case InternalType_StdinReq:
+            {
+                if (trace_settings & (TRACE_REDIRECTION | TRACE_PROCESS))
+                {
+                    trace_printf( "%s@%d" " - Received InternalType_StdinReq\n"
+                                , method_name, __LINE__ );
+                }
+                ReqQueue.enqueueStdInReq( &msg.u.stdin_req );
+                break;
+            }
             default:
             {
                 abort();
@@ -273,7 +296,7 @@ void CPtpCommAccept::commAcceptorSock()
             char buf[MON_STRING_BUF_SIZE];
             snprintf(buf, sizeof(buf), "[%s], cannot accept new monitor: %s.\n",
                      method_name, strerror(errno));
-            mon_log_write(MON_COMMACCEPT_16, SQ_LOG_ERR, buf);
+            mon_log_write(PTP_COMMACCEPT_6, SQ_LOG_ERR, buf);
         }
         else
         {
@@ -330,7 +353,7 @@ static void *ptpCommAccept(void *arg)
         char buf[MON_STRING_BUF_SIZE];
         snprintf(buf, sizeof(buf), "[%s], pthread_sigmask error=%d\n",
                  method_name, rc);
-        mon_log_write(MON_COMMACCEPT_17, SQ_LOG_ERR, buf);
+        mon_log_write(PTP_COMMACCEPT_7, SQ_LOG_ERR, buf);
     }
 
     // Enter thread processing loop
@@ -353,7 +376,7 @@ void CPtpCommAccept::start()
         char buf[MON_STRING_BUF_SIZE];
         snprintf(buf, sizeof(buf), "[%s], thread create error=%d\n",
                  method_name, rc);
-        mon_log_write(MON_COMMACCEPT_18, SQ_LOG_ERR, buf);
+        mon_log_write(PTP_COMMACCEPT_8, SQ_LOG_ERR, buf);
     }
 
     TRACE_EXIT;

http://git-wip-us.apache.org/repos/asf/trafodion/blob/3931a75f/core/sqf/monitor/linux/redirector.cxx
----------------------------------------------------------------------
diff --git a/core/sqf/monitor/linux/redirector.cxx b/core/sqf/monitor/linux/redirector.cxx
index 54710bc..43bb231 100644
--- a/core/sqf/monitor/linux/redirector.cxx
+++ b/core/sqf/monitor/linux/redirector.cxx
@@ -58,6 +58,7 @@ using namespace std;
 #include "replicate.h"
 #include "monsonar.h"
 #include "reqqueue.h"
+#include "ptpclient.h"
 #endif
 
 #ifndef NAMESERVER_PROCESS
@@ -70,6 +71,8 @@ extern CNodeContainer *Nodes;
 extern CReplicate Replicator;
 extern CMonStats *MonStats;
 extern CReqQueue ReqQueue;
+extern CPtpClient *PtpClient;
+extern bool NameServerEnabled;
 #endif
 
 const char *EpollEventString( __uint32_t events )
@@ -662,10 +665,23 @@ int CRedirectAncestorStdin::handleInput()
             reqType = STDIN_FLOW_ON;
         }
 
-    CReplStdinReq *repl
-        = new CReplStdinReq(MyPNID, pid_, reqType, ancestorNid_, ancestorPid_ );
-    Replicator.addItem(repl);
-
+        if (NameServerEnabled)
+        {
+            PtpClient->StdInReq( MyPNID
+                               , pid_
+                               , reqType
+                               , ancestorNid_
+                               , ancestorPid_  );
+        }
+        else
+        {
+            CReplStdinReq *repl = new CReplStdinReq( MyPNID
+                                                   , pid_
+                                                   , reqType
+                                                   , ancestorNid_
+                                                   , ancestorPid_ );
+            Replicator.addItem(repl);
+        }
     }
 
     TRACE_EXIT;
@@ -856,9 +872,24 @@ void CRedirectStdinRemote::handleOutput(ssize_t count, char *buffer)
                      (int)count);
     }
 
-    CReplStdioData *repl
-        = new CReplStdioData(requesterNid_, pid_, STDIN_DATA, count, buffer );
-    Replicator.addItem(repl);
+    if (NameServerEnabled)
+    {
+        PtpClient->StdIoData( requesterNid_
+                            , pid_
+                            , STDIN_DATA
+                            , count
+                            , buffer );
+    }
+    else
+    {
+        CReplStdioData *repl = new CReplStdioData( requesterNid_
+                                                 , pid_
+                                                 , STDIN_DATA
+                                                 , count
+                                                 , buffer );
+        Replicator.addItem(repl);
+    }
+
     if (sonar_verify_state(SONAR_ENABLED | SONAR_MONITOR_ENABLED))
        MonStats->StdinRemoteDataReplIncr();
 
@@ -1144,10 +1175,24 @@ void CRedirectAncestorStdout::handleOutput(ssize_t count, char *buffer)
                      (int)count);
     }
 
-    CReplStdioData *repl
-        = new CReplStdioData(ancestor_nid_, ancestor_pid_, STDOUT_DATA, 
-                              count, buffer );
-    Replicator.addItem(repl);
+    if (NameServerEnabled)
+    {
+        PtpClient->StdIoData( ancestor_nid_
+                            , ancestor_pid_
+                            , STDOUT_DATA
+                            , count
+                            , buffer );
+    }
+    else
+    {
+        CReplStdioData *repl = new CReplStdioData( ancestor_nid_
+                                                 , ancestor_pid_
+                                                 , STDOUT_DATA
+                                                 , count
+                                                 , buffer );
+        Replicator.addItem(repl);
+    }
+
     if (sonar_verify_state(SONAR_ENABLED | SONAR_MONITOR_ENABLED))
        MonStats->StdioDataReplIncr();
 
@@ -1607,10 +1652,23 @@ void CRedirector::stdinFd(int nid, int pid, int &pipeFd, char filename[],
         fdMap_.insert(std::make_pair(pipeFd, redirect));
         fdMapLock_.unlock();
 
-        CReplStdinReq *repl
-            = new CReplStdinReq(nid, pid, STDIN_REQ_DATA, ancestor_nid,
-                                ancestor_pid );
-        Replicator.addItem(repl);
+        if (NameServerEnabled)
+        {
+            PtpClient->StdInReq( nid
+                               , pid
+                               , STDIN_REQ_DATA
+                               , ancestor_nid
+                               , ancestor_pid );
+        }
+        else
+        {
+            CReplStdinReq *repl = new CReplStdinReq( nid
+                                                   , pid
+                                                   , STDIN_REQ_DATA
+                                                   , ancestor_nid
+                                                   , ancestor_pid );
+            Replicator.addItem(repl);
+        }
     }
 
     TRACE_EXIT;

http://git-wip-us.apache.org/repos/asf/trafodion/blob/3931a75f/core/sqf/monitor/linux/replicate.cxx
----------------------------------------------------------------------
diff --git a/core/sqf/monitor/linux/replicate.cxx b/core/sqf/monitor/linux/replicate.cxx
index 0b6fadb..f9ebf53 100644
--- a/core/sqf/monitor/linux/replicate.cxx
+++ b/core/sqf/monitor/linux/replicate.cxx
@@ -71,6 +71,9 @@ void CReplObj::validateObj()
 #ifdef NAMESERVER_PROCESS
 struct dummy_sizeof_def {};
 #endif
+#ifndef EXCHANGE_CPU_SCHEDULING_DATA
+struct dummy1_sizeof_def {};
+#endif
 
 // Determine the maximum size of a replication object (excluding CReplEvent)
 int CReplObj::calcAllocSize()
@@ -81,7 +84,11 @@ int CReplObj::calcAllocSize()
                                                                                         sizeof(CReplNodeDelete)),
                                                                                     sizeof(CReplSoftNodeUp)),
                                                                                 sizeof(CReplSoftNodeDown)),
+#ifdef EXCHANGE_CPU_SCHEDULING_DATA
                                                                             sizeof(CReplSchedData)),
+#else
+                                                                            sizeof(dummy1_sizeof_def)),
+#endif
                                                                         sizeof(CReplActivateSpare)),
                                                                     sizeof(CReplConfigData)),
                                                                 sizeof(CReplOpen)),
@@ -1789,6 +1796,7 @@ bool CReplNodeName::replicate(struct internal_msg_def *&msg)
     return true;
 }
 
+#ifdef EXCHANGE_CPU_SCHEDULING_DATA
 CReplSchedData::CReplSchedData()
 {
     // Add eyecatcher sequence as a debugging aid
@@ -1865,6 +1873,7 @@ bool CReplSchedData::replicate(struct internal_msg_def *&msg)
 
     return true;
 }
+#endif
 
 
 CReplNodeUp::CReplNodeUp(int pnid) : pnid_(pnid)

http://git-wip-us.apache.org/repos/asf/trafodion/blob/3931a75f/core/sqf/monitor/linux/replicate.h
----------------------------------------------------------------------
diff --git a/core/sqf/monitor/linux/replicate.h b/core/sqf/monitor/linux/replicate.h
index 5d4a7cc..3ae0909 100644
--- a/core/sqf/monitor/linux/replicate.h
+++ b/core/sqf/monitor/linux/replicate.h
@@ -424,6 +424,7 @@ private:
 };
 
 
+#ifdef EXCHANGE_CPU_SCHEDULING_DATA
 class CReplSchedData: public CReplObj
 {
 public:
@@ -435,6 +436,7 @@ public:
 private:
 
 };
+#endif
 
 #ifndef NAMESERVER_PROCESS
 class CReplStdioData: public CReplObj

http://git-wip-us.apache.org/repos/asf/trafodion/blob/3931a75f/core/sqf/monitor/linux/reqdump.cxx
----------------------------------------------------------------------
diff --git a/core/sqf/monitor/linux/reqdump.cxx b/core/sqf/monitor/linux/reqdump.cxx
index 159219a..4e56bb5 100644
--- a/core/sqf/monitor/linux/reqdump.cxx
+++ b/core/sqf/monitor/linux/reqdump.cxx
@@ -75,7 +75,7 @@ void CExtDumpReq::performRequest()
 
     CProcess *target;
     CProcess *requester;
-    CLNode   *node;
+    CLNode   *lnode;
     string    target_process_name;
     int       target_nid = -1;
     int       target_pid = -1;
@@ -144,9 +144,10 @@ void CExtDumpReq::performRequest()
                              method_name, __LINE__, target->GetName(),
                              target->GetNid(), target->GetPid());
             target->parentContext(msg_);
-            if (node->Dump_Process(requester,
-                                   target,
-                                   msg_->u.request.u.dump.path) != SUCCESS)
+            lnode = Nodes->GetLNode(target_nid);
+            if (lnode->Dump_Process(requester,
+                                    target,
+                                    msg_->u.request.u.dump.path) != SUCCESS)
                 rc = MPI_ERR_SPAWN;
         }
         else

http://git-wip-us.apache.org/repos/asf/trafodion/blob/3931a75f/core/sqf/monitor/linux/reqnewproc.cxx
----------------------------------------------------------------------
diff --git a/core/sqf/monitor/linux/reqnewproc.cxx b/core/sqf/monitor/linux/reqnewproc.cxx
index b5f9ec1..a6da01e 100644
--- a/core/sqf/monitor/linux/reqnewproc.cxx
+++ b/core/sqf/monitor/linux/reqnewproc.cxx
@@ -461,6 +461,7 @@ void CExtNewProcReq::performRequest()
                                                 programStrId,
                                                 msg_->u.request.u.new_process.infile,
                                                 msg_->u.request.u.new_process.outfile
+                                                , 0 // tag
                                                 , result
                                                 );
                 if ( process )
@@ -468,7 +469,7 @@ void CExtNewProcReq::performRequest()
                     process->userArgs (  msg_->u.request.u.new_process.argc,
                                          msg_->u.request.u.new_process.argv );
                 }
-                if ( process && process->Create(process->GetParent(), result))
+                if ( process && process->Create(process->GetParent(), 0, result))
                 {
                     MyNode->AddToNameMap(process);
                     MyNode->AddToPidMap(process->GetPid(), process);

http://git-wip-us.apache.org/repos/asf/trafodion/blob/3931a75f/core/sqf/monitor/linux/reqprocinfo.cxx
----------------------------------------------------------------------
diff --git a/core/sqf/monitor/linux/reqprocinfo.cxx b/core/sqf/monitor/linux/reqprocinfo.cxx
index 566a92b..84dc3a7 100644
--- a/core/sqf/monitor/linux/reqprocinfo.cxx
+++ b/core/sqf/monitor/linux/reqprocinfo.cxx
@@ -634,99 +634,118 @@ void CExtProcInfoContReq::performRequest()
     TRACE_ENTRY;
 
 #ifndef NAMESERVER_PROCESS
-    if ( NameServerEnabled )
+    bool    getMonitorInfo = false;
+    if (strcasecmp(msg_->u.request.u.process_info.target_process_name, "MONITOR") == 0)
+    {
+        getMonitorInfo = true;
+        msg_->u.request.u.process_info.target_process_name[0] = 0;
+    }
+
+    if ( NameServerEnabled && !getMonitorInfo )
         NameServer->ProcessInfoCont(msg_); // in reqQueue thread (CExternalReq)
 #endif
 
-    int count = 0;
-    int nid;
-    int pid;
-
-    // Record statistics (sonar counters)
-    if (sonar_verify_state(SONAR_ENABLED | SONAR_MONITOR_ENABLED))
-       MonStats->req_type_processinfocont_Incr();
-
-    if (trace_settings & (TRACE_REQUEST | TRACE_PROCESS))
+#ifndef NAMESERVER_PROCESS
+    if ( NameServerEnabled && !getMonitorInfo )
     {
-        trace_printf("%s@%d request #%ld: ProcessInfoCont, context (%d, %d), "
-                     "process type=%d, allnodes=%d\n", method_name, __LINE__,
-                     id_,
-                     msg_->u.request.u.process_info_cont.context[0].nid,
-                     msg_->u.request.u.process_info_cont.context[0].pid,
-                     msg_->u.request.u.process_info_cont.type,
-                     msg_->u.request.u.process_info_cont.allNodes);
+        // Send reply to requester
+        lioreply(msg_, pid_);
     }
+    else
+    {
+#endif
+        int count = 0;
+        int nid;
+        int pid;
 
-    msg_->u.reply.u.process_info.more_data = false;
-
-    // Using context from the last reply, locate next process.
-    // Generally the final process in the last reply will still exist
-    // so we locate its CProcess object for continuation.  If that
-    // process no longer exists we try to find other processes in the
-    // context list until we find the CProcess object or run out of
-    // context.
-    int i = -1;
-    CProcess *process = 0;
+        // Record statistics (sonar counters)
+        if (sonar_verify_state(SONAR_ENABLED | SONAR_MONITOR_ENABLED))
+           MonStats->req_type_processinfocont_Incr();
 
-    while (!process && ++i < MAX_PROC_CONTEXT)
-    {
-        nid = msg_->u.request.u.process_info_cont.context[i].nid;
-        pid = msg_->u.request.u.process_info_cont.context[i].pid;
-        if (nid >= 0 && nid < Nodes->GetLNodesConfigMax())
+        if (trace_settings & (TRACE_REQUEST | TRACE_PROCESS))
         {
-            process = Nodes->GetLNode(nid)->GetProcessL(pid);
+            trace_printf("%s@%d request #%ld: ProcessInfoCont, context (%d, %d), "
+                         "process type=%d, allnodes=%d\n", method_name, __LINE__,
+                         id_,
+                         msg_->u.request.u.process_info_cont.context[0].nid,
+                         msg_->u.request.u.process_info_cont.context[0].pid,
+                         msg_->u.request.u.process_info_cont.type,
+                         msg_->u.request.u.process_info_cont.allNodes);
         }
-    }
 
+        msg_->u.reply.u.process_info.more_data = false;
 
-    if (!process)
-    {   // Could not locate any process in the context list.  So
-        // begin with the first process in the node.
-        nid = msg_->u.request.u.process_info_cont.context[0].nid;
-        if (trace_settings & TRACE_REQUEST)
-           trace_printf("%s@%d" " could not find context process, restarting for node="  "%d" "\n", method_name, __LINE__, nid);
-        if (nid >= 0 && nid < Nodes->GetLNodesConfigMax())
+        // Using context from the last reply, locate next process.
+        // Generally the final process in the last reply will still exist
+        // so we locate its CProcess object for continuation.  If that
+        // process no longer exists we try to find other processes in the
+        // context list until we find the CProcess object or run out of
+        // context.
+        int i = -1;
+        CProcess *process = 0;
+
+        while (!process && ++i < MAX_PROC_CONTEXT)
         {
-            process = ProcessInfo_GetProcess (nid, msg_->u.request.u.process_info_cont.allNodes);
+            nid = msg_->u.request.u.process_info_cont.context[i].nid;
+            pid = msg_->u.request.u.process_info_cont.context[i].pid;
+            if (nid >= 0 && nid < Nodes->GetLNodesConfigMax())
+            {
+                process = Nodes->GetLNode(nid)->GetProcessL(pid);
+            }
         }
-    }
 
-    // Assuming we found a CProcess object resume returning data with
-    // the subsequent process.
-    if (process)
-    {
-        process = process->GetNextL();
+
         if (!process)
-        {   // We were at the last process on the node.  Get first process
-            // on the next node (if there is a next node).
-            if (++nid < Nodes->GetLNodesConfigMax())
+        {   // Could not locate any process in the context list.  So
+            // begin with the first process in the node.
+            nid = msg_->u.request.u.process_info_cont.context[0].nid;
+            if (trace_settings & TRACE_REQUEST)
+               trace_printf("%s@%d" " could not find context process, restarting for node="  "%d" "\n", method_name, __LINE__, nid);
+            if (nid >= 0 && nid < Nodes->GetLNodesConfigMax())
             {
-                process = ProcessInfo_GetProcess(nid,
-                                msg_->u.request.u.process_info_cont.allNodes);
+                process = ProcessInfo_GetProcess (nid, msg_->u.request.u.process_info_cont.allNodes);
             }
         }
 
+        // Assuming we found a CProcess object resume returning data with
+        // the subsequent process.
         if (process)
         {
-            count = ProcessInfo_BuildReply(
-                                process,
-                                msg_,
-                                msg_->u.request.u.process_info_cont.type,
-                                msg_->u.request.u.process_info_cont.allNodes,
-                                (char *) "");
+            process = process->GetNextL();
+            if (!process)
+            {   // We were at the last process on the node.  Get first process
+                // on the next node (if there is a next node).
+                if (++nid < Nodes->GetLNodesConfigMax())
+                {
+                    process = ProcessInfo_GetProcess(nid,
+                                    msg_->u.request.u.process_info_cont.allNodes);
+                }
+            }
+
+            if (process)
+            {
+                count = ProcessInfo_BuildReply(
+                                    process,
+                                    msg_,
+                                    msg_->u.request.u.process_info_cont.type,
+                                    msg_->u.request.u.process_info_cont.allNodes,
+                                    (char *) "");
 
+            }
         }
-    }
 
-    msg_->u.reply.type = ReplyType_ProcessInfo;
-    msg_->u.reply.u.process_info.num_processes = count;
-    msg_->u.reply.u.process_info.return_code = MPI_SUCCESS;
+        msg_->u.reply.type = ReplyType_ProcessInfo;
+        msg_->u.reply.u.process_info.num_processes = count;
+        msg_->u.reply.u.process_info.return_code = MPI_SUCCESS;
 
 #ifdef NAMESERVER_PROCESS
-    monreply(msg_, sockFd_);
+        monreply(msg_, sockFd_);
 #else
-    // Send reply to requester
-    lioreply(msg_, pid_);
+        // Send reply to requester
+        lioreply(msg_, pid_);
+#endif
+#ifndef NAMESERVER_PROCESS
+    }
 #endif
 
     TRACE_EXIT;

http://git-wip-us.apache.org/repos/asf/trafodion/blob/3931a75f/core/sqf/monitor/linux/reqqueue.cxx
----------------------------------------------------------------------
diff --git a/core/sqf/monitor/linux/reqqueue.cxx b/core/sqf/monitor/linux/reqqueue.cxx
index ac559c2..e97c0a7 100644
--- a/core/sqf/monitor/linux/reqqueue.cxx
+++ b/core/sqf/monitor/linux/reqqueue.cxx
@@ -23,6 +23,7 @@
 //
 ///////////////////////////////////////////////////////////////////////////////
 
+#include <stddef.h>
 #include <stdio.h>
 #include <zlib.h>
 #include "reqqueue.h"
@@ -40,6 +41,7 @@
 #include "internal.h"
 #include "healthcheck.h"
 #ifndef NAMESERVER_PROCESS
+#include "redirector.h"
 #include "nameserver.h"
 #include "ptpclient.h"
 #endif
@@ -59,6 +61,7 @@ extern CHealthCheck HealthCheck;
 #ifdef NAMESERVER_PROCESS
 extern char *ErrorMsg (int error_code);
 #else
+extern CRedirector Redirector;
 extern bool NameServerEnabled;
 extern CPtpClient *PtpClient;
 extern CNameServer *NameServer;
@@ -69,6 +72,10 @@ extern int req_type_startup;
 
 extern bool IAmIntegrating;
 extern bool IAmIntegrated;
+extern bool IsRealCluster;
+extern bool IsAgentMode;
+extern bool IsMaster;
+extern bool ZClientEnabled;
 
 extern CommType_t CommType;
 extern bool IsRealCluster;
@@ -1247,6 +1254,93 @@ void CIntExitNsReq::performRequest()
 #endif
 
 #ifndef NAMESERVER_PROCESS
+CIntIoDataReq::CIntIoDataReq( ioData_t *ioData )
+             : CInternalReq()
+             , nid_( ioData->nid )
+             , pid_( ioData->pid )
+             , verifier_( ioData->verifier )
+             , ioType_( ioData->ioType )
+             , length_( ioData->length )
+{
+    // Add eyecatcher sequence as a debugging aid
+    memcpy(&eyecatcher_, "RqIK", 4);
+    memcpy(data_, ioData->data, (length_<=MAX_SYNC_DATA)?length_:MAX_SYNC_DATA);
+}
+
+CIntIoDataReq::~CIntIoDataReq()
+{
+    // Alter eyecatcher sequence as a debugging aid to identify deleted object
+    memcpy(&eyecatcher_, "rQik", 4);
+}
+
+void CIntIoDataReq::populateRequestString( void )
+{
+    char strBuf[MON_STRING_BUF_SIZE/2];
+    sprintf( strBuf, "IntReq(%s) req #=%ld (nid=%d/pid=%d/verifier=%d), type=%d, length=%d"
+                   , CReqQueue::intReqType[InternalType_IoData]
+                   , getId(), nid_, pid_, verifier_, ioType_, length_ );
+    requestString_.assign( strBuf );
+}
+
+void CIntIoDataReq::performRequest()
+{
+    const char method_name[] = "CIntIoDataReq::performRequest";
+    TRACE_ENTRY;
+
+    if (trace_settings & (TRACE_REDIRECTION | TRACE_PROCESS))
+    {
+        trace_printf( "%s@%d" " - IO data "
+                      "to (%d,%d:%d), count=%d\n(%s)"
+                    , method_name, __LINE__
+                    , nid_
+                    , pid_
+                    , verifier_
+                    , length_
+                    , length_?data_:"\n" );
+    }
+    if ( MyNode->IsMyNode( nid_ ) )
+    {
+        if (trace_settings & (TRACE_SYNC | TRACE_REDIRECTION))
+            trace_printf( "%s@%d - processing IO Data for (%d, %d:%d)\n"
+                        , method_name, __LINE__
+                        , nid_, pid_, verifier_ );
+
+        CLNode  *lnode;
+        lnode = Nodes->GetLNode( nid_ );
+        if ( lnode )
+        {
+            CProcess *process;
+            process = lnode->GetProcessL( pid_ );
+            if (process)
+            {
+                int fd;
+                if (ioType_ == STDIN_DATA)
+                {
+                    fd = process->FdStdin();
+                }
+                else
+                {
+                    fd = process->FdStdout();
+                }
+                Redirector.disposeIoData( fd, length_, data_ );
+            }
+            else
+            {
+                char buf[MON_STRING_BUF_SIZE];
+                snprintf( buf, sizeof(buf)
+                        , "[%s], Can't find process nid"
+                          "=%d, pid=%d for processing IO Data.\n"
+                        , method_name, nid_, pid_ );
+                mon_log_write(MON_REQ_IODATA_1, SQ_LOG_ERR, buf);
+            }
+        }
+    }
+
+    TRACE_EXIT;
+}
+#endif
+
+#ifndef NAMESERVER_PROCESS
 CIntKillReq::CIntKillReq( struct kill_def *killDef )
             : CInternalReq()
             , nid_( killDef->nid )
@@ -1527,8 +1621,17 @@ void CIntNewProcReq::performRequest()
                                 programStrId_,
                                 &stringData_[nameLen_],  // infile
                                 &stringData_[nameLen_ + infileLen_], // outfile
+                                reqTag_,
                                 result);
-#ifndef NAMESERVER_PROCESS
+#ifdef NAMESERVER_PROCESS
+            if (  newProcess == NULL )
+            {
+                char buf[MON_STRING_BUF_SIZE];
+                sprintf( buf, "[%s], Can't create process %s (%d,%d:%d)\n"
+                       , method_name, &stringData_[0],nid_, pid_, verifier_ );
+                mon_log_write(MON_INTREQ_NEWPROC_1, SQ_LOG_ERR, buf);
+            }
+#else
             if ( newProcess != NULL )
             {
                 newProcess->userArgs ( argc_, argvLen_,
@@ -1536,27 +1639,19 @@ void CIntNewProcReq::performRequest()
                                                     + outfileLen_] );
 
                 // Create the new process (fork/exec)
-                if (newProcess->Create(newProcess->GetParent(), result))
+                if (newProcess->Create(newProcess->GetParent(), reqTag_, result))
                 {
                     MyNode->AddToNameMap( newProcess );
                     MyNode->AddToPidMap( newProcess->GetPid(),  newProcess );
 
-                    if (NameServerEnabled)
-                    {
-                        // Send actual pid and process name back to parent
-                        PtpClient->ProcessInit( newProcess
-                                              , reqTag_
-                                              , 0
-                                              , parentNid_ );
-                    }
-                    else
+                    if (!NameServerEnabled)
                     {
                         // Successfully forked process.  Replicate actual process
                         // id and process name.
                         CReplProcInit *repl
                             = new CReplProcInit(newProcess, reqTag_, 0, parentNid_);
                         Replicator.addItem(repl);
-                   }
+                    }
                 }
                 else
                 {
@@ -1592,7 +1687,7 @@ void CIntNewProcReq::performRequest()
         sprintf(buf, "[%s], Can't find parent process nid=%d, pid=%d "
                 "for process create.\n", method_name,
                 parentNid_, parentPid_ );
-        mon_log_write(MON_CLUSTER_HANDLEOTHERNODE_10, SQ_LOG_ERR, buf);
+        mon_log_write(MON_INTREQ_NEWPROC_2, SQ_LOG_ERR, buf);
     }
 
     TRACE_EXIT;
@@ -1998,8 +2093,10 @@ void CIntProcInitReq::performRequest()
     const char method_name[] = "CIntProcInitReq::performRequest";
     TRACE_ENTRY;
 
-    if (trace_settings & TRACE_SYNC)
-        trace_printf("%s@%d - processing process init %s (%d, %d), tag %p\n", method_name, __LINE__, name_, nid_, pid_, process_);
+    if (trace_settings & (TRACE_SYNC | TRACE_PROCESS))
+        trace_printf( "%s@%d - processing process init %s (%d, %d), result=%d, tag=%p\n"
+                    , method_name, __LINE__
+                    , name_, nid_, pid_, result_, static_cast<void*>(process_) );
 
     if ( result_ != 0 )
     {  // Was unable to create the process, send response to requester
@@ -2020,9 +2117,8 @@ void CIntProcInitReq::performRequest()
         process_->SetName ( name_ );
 
         // Add to pid and name maps
-        Nodes->GetLNode (process_->GetNid())->GetNode()->
-            AddToPidMap(process_->GetPid(), process_);
-        Nodes->GetLNode (process_->GetNid())->GetNode()->AddToNameMap(process_);
+        Nodes->GetLNode( process_->GetNid() )->GetNode()->AddToPidMap(process_->GetPid(), process_);
+        Nodes->GetLNode( process_->GetNid() )->GetNode()->AddToNameMap(process_);
 
         if (process_->IsBackup())
         {
@@ -2032,7 +2128,7 @@ void CIntProcInitReq::performRequest()
             if (parent)
             {   // Set link from primary process object to
                 // this backup process object.
-                if (trace_settings & TRACE_SYNC)
+                if (trace_settings & (TRACE_SYNC | TRACE_PROCESS))
                 {
                     trace_printf("%s@%d - For backup process (%d, %d)"
                                  ", for parent (%d, %d) setting "
@@ -2125,6 +2221,127 @@ void CIntSetReq::performRequest()
     TRACE_EXIT;
 }
 
+#ifndef NAMESERVER_PROCESS
+CIntStdInReq::CIntStdInReq( struct stdin_req_def *stdin_req )
+             : CInternalReq()
+             , nid_( stdin_req->nid )
+             , pid_( stdin_req->pid )
+             , verifier_( stdin_req->verifier )
+             , reqType_( stdin_req->reqType )
+             , supplierNid_( stdin_req->supplier_nid )
+             , supplierPid_( stdin_req->supplier_pid )
+{
+    // Add eyecatcher sequence as a debugging aid
+    memcpy(&eyecatcher_, "RqIS", 4);
+}
+
+CIntStdInReq::~CIntStdInReq()
+{
+    // Alter eyecatcher sequence as a debugging aid to identify deleted object
+    memcpy(&eyecatcher_, "rQis", 4);
+}
+
+void CIntStdInReq::populateRequestString( void )
+{
+    char strBuf[MON_STRING_BUF_SIZE/2];
+    sprintf( strBuf, "IntReq(%s) req #=%ld (nid=%d/pid=%d/verifier=%d), "
+                     "type=%d, supplier (%d,%d)"
+                   , CReqQueue::intReqType[InternalType_StdinReq]
+                   , getId(), nid_, pid_, verifier_, reqType_
+                   , supplierNid_, supplierPid_ );
+    requestString_.assign( strBuf );
+}
+
+void CIntStdInReq::performRequest()
+{
+    const char method_name[] = "CIntStdInReq::performRequest";
+    TRACE_ENTRY;
+
+    if (trace_settings & (TRACE_REDIRECTION | TRACE_PROCESS))
+    {
+        trace_printf("%s@%d - stdin request from (%d,%d:%d)"
+                     ", type=%d, for supplier (%d,%d)\n"
+                    , method_name, __LINE__
+                    , nid_
+                    , pid_
+                    , verifier_
+                    , reqType_
+                    , supplierNid_
+                    , supplierPid_ );
+    }
+
+    if ( !MyNode->IsMyNode( supplierNid_ ) )
+    {
+        return;
+    }
+
+    CLNode  *lnode;
+    lnode = Nodes->GetLNode( nid_ );
+    if ( lnode == NULL )
+    {
+        return;
+    }
+
+    CProcess *process;
+    process = lnode->GetProcessL( pid_ );
+    if (process)
+    {
+        if (reqType_ == STDIN_REQ_DATA)
+        {
+            // Set up to forward stdin data to requester.
+            // Save file descriptor associated with stdin
+            // so can find the redirector object later.
+            CProcess *supProcess;
+            lnode = Nodes->GetLNode( supplierNid_ );
+            if ( lnode )
+            {
+                supProcess = lnode->GetProcessL ( supplierPid_ );
+                if (supProcess)
+                {
+                    int fd;
+                    fd = Redirector.stdinRemote( supProcess->infile()
+                                               , supplierNid_
+                                               , supplierPid_ );
+                    process->FdStdin(fd);
+                }
+                else
+                {
+                    char buf[MON_STRING_BUF_SIZE];
+                    snprintf( buf, sizeof(buf), 
+                              "[%s], Can't find supplier process "
+                              "nid=%d, pid=%d for stdin data request.\n"
+                            , method_name
+                            , supplierNid_
+                            , supplierPid_);
+                    mon_log_write(MON_REQ_STDIN_1, SQ_LOG_ERR, buf);
+                }
+            }
+        }
+        else if (reqType_ == STDIN_FLOW_OFF)
+        {
+            Redirector.stdinOff(process->FdStdin());
+        }
+        else if (reqType_ == STDIN_FLOW_ON)
+        {
+            Redirector.stdinOn(process->FdStdin());
+        }
+    }
+    else
+    {
+        char buf[MON_STRING_BUF_SIZE];
+        snprintf( buf, sizeof(buf)
+                , "[%s], Can't find process nid=%d, "
+                  "pid=%d for stdin data request.\n"
+                , method_name
+                , nid_
+                , pid_ );
+        mon_log_write(MON_REQ_STDIN_2, SQ_LOG_ERR, buf);
+    }
+
+    TRACE_EXIT;
+}
+#endif
+
 CIntUniqStrReq::CIntUniqStrReq( int nid, int id, const char *value )
     : CInternalReq(), nid_(nid), id_(id)
 {
@@ -3191,9 +3408,7 @@ void CIntReviveReq::performRequest()
 #ifndef NAMESERVER_PROCESS
     // unpack the current TM leader
     Monitor->SetTmLeader( header.tmLeader_ );
-#endif
 
-#ifndef NAMESERVER_PROCESS
     if (trace_settings & (TRACE_REQUEST | TRACE_INIT | TRACE_RECOVERY))
         trace_printf( "%s@%d - TM leader (%d) unpacked\n", method_name, __LINE__
                     , Monitor->GetTmLeader() );
@@ -4409,7 +4624,17 @@ void CReqQueue::enqueueExitNsReq( struct exit_ns_def *exitDef )
 #endif
 
 #ifndef NAMESERVER_PROCESS
-//void CReqQueue::enqueueKillReq( int nid, int pid, bool abort )
+void CReqQueue::enqueueIoDataReq( ioData_t *ioData )
+{
+    CInternalReq * request;
+
+    request = new CIntIoDataReq ( ioData );
+
+    enqueueReq ( request );
+}
+#endif
+
+#ifndef NAMESERVER_PROCESS
 void CReqQueue::enqueueKillReq( struct kill_def *killDef )
 {
     CInternalReq * request;
@@ -4495,6 +4720,17 @@ void CReqQueue::enqueueSetReq( struct set_def *setDef )
     enqueueReq ( request );
 }
 
+#ifndef NAMESERVER_PROCESS
+void CReqQueue::enqueueStdInReq( struct stdin_req_def *stdin_req )
+{
+    CInternalReq * request;
+
+    request = new CIntStdInReq ( stdin_req );
+
+    enqueueReq ( request );
+}
+#endif
+
 void CReqQueue::enqueueUniqStrReq( struct uniqstr_def *uniqStrDef )
 {
     CIntUniqStrReq * request;
@@ -4549,7 +4785,7 @@ void CReqQueue::enqueueTmReadyReq( int nid )
 }
 #endif
 
-// this function moves the queued requests from revieve queue to the main request queue.
+// this function moves the queued requests from revive queue to the main request queue.
 // it will skip the requests whose seq num is less than the given one.
 void CReqQueue::processReviveRequests(unsigned long long minSeqNum)
 {

http://git-wip-us.apache.org/repos/asf/trafodion/blob/3931a75f/core/sqf/monitor/linux/reqqueue.h
----------------------------------------------------------------------
diff --git a/core/sqf/monitor/linux/reqqueue.h b/core/sqf/monitor/linux/reqqueue.h
index 995912d..34cde08 100644
--- a/core/sqf/monitor/linux/reqqueue.h
+++ b/core/sqf/monitor/linux/reqqueue.h
@@ -988,6 +988,27 @@ private:
 #endif
 
 #ifndef NAMESERVER_PROCESS
+class CIntIoDataReq: public CInternalReq
+{
+public:
+    CIntIoDataReq( ioData_t *ioData );
+    virtual ~CIntIoDataReq();
+
+    void performRequest();
+
+private:
+    void populateRequestString( void );
+
+    int nid_;
+    int pid_;
+    Verifier_t verifier_;
+    StdIoType ioType_;
+    int  length_;                     // Length in bytes of Data buffer used
+    char data_[MAX_SYNC_DATA];
+};
+#endif
+
+#ifndef NAMESERVER_PROCESS
 class CIntKillReq: public CInternalReq
 {
 public:
@@ -1150,6 +1171,27 @@ private:
     char value_[MAX_VALUE_SIZE_INT];
 };
 
+#ifndef NAMESERVER_PROCESS
+class CIntStdInReq: public CInternalReq
+{
+public:
+    CIntStdInReq( struct stdin_req_def *stdin_req );
+    virtual ~CIntStdInReq();
+
+    void performRequest();
+
+private:
+    void populateRequestString( void );
+
+    int nid_;
+    int pid_;
+    Verifier_t verifier_;
+    StdinReqType reqType_;
+    int supplierNid_;       // Node id of process supplying stdin data
+    int supplierPid_;       // Process id of process to supplying stdin data
+};
+#endif
+
 class CIntUniqStrReq: public CInternalReq
 {
 public:
@@ -1506,6 +1548,7 @@ class CReqQueue
     void enqueueDeleteReq( struct delete_def *deleteDef );
 #endif
 #ifndef NAMESERVER_PROCESS
+    void enqueueIoDataReq( ioData_t *ioData );
     void enqueueKillReq( struct kill_def *killDef );
 #endif
     void enqueueNewProcReq( struct process_def *procDef );
@@ -1515,6 +1558,9 @@ class CReqQueue
 #endif
     void enqueueProcInitReq( struct process_init_def *procInitDef );
     void enqueueSetReq( struct set_def *setDef );
+#ifndef NAMESERVER_PROCESS
+    void enqueueStdInReq( struct stdin_req_def *stdin_req );
+#endif
     void enqueueUniqStrReq( struct uniqstr_def *uniqStrDef );
 #ifndef NAMESERVER_PROCESS
     void enqueueChildDeathReq ( pid_t pid );
@@ -1637,6 +1683,7 @@ private:
       RQIH   CIntShutdownReq
       RQII   CIntProcInitReq
       RQIJ   CIntNodeAddReq
+      RqIK   CIntIoDataReq
       RQIK   CIntKillReq
       RQIL   CIntCloneProcReq
       RQIM   CIntActivateSpareReq
@@ -1646,6 +1693,7 @@ private:
       RQIQ   CIntUpReq
       RQIR   CIntReviveReq
       RQIS   CIntSetReq
+      RqIS   CIntStdInReq
       RQIT   CIntNodeDeleteReq
       RQIU   CQuiesceReq
       RQIV   CIntTmReadyReq

http://git-wip-us.apache.org/repos/asf/trafodion/blob/3931a75f/core/sqf/monitor/linux/shell.cxx
----------------------------------------------------------------------
diff --git a/core/sqf/monitor/linux/shell.cxx b/core/sqf/monitor/linux/shell.cxx
index 20b41e6..51206d3 100644
--- a/core/sqf/monitor/linux/shell.cxx
+++ b/core/sqf/monitor/linux/shell.cxx
@@ -323,7 +323,7 @@ const char *StateString( STATE state )
         str = "Shutdown";
         break;
     case State_Initializing:
-        str = "Initing";
+        str = "Initializing";
         break;
     case State_Merged:
         str = "Merged";
@@ -3045,7 +3045,6 @@ void listZoneInfo( int nid, int zid )
 {
     int i;
     int count;
-    int  last_nid = 0;
     MPI_Status status;
 
     if ( gp_local_mon_io->acquire_msg( &msg ) != 0 )
@@ -3098,7 +3097,6 @@ void listZoneInfo( int nid, int zid )
                             if ( msg->u.reply.u.zone_info.node[i].nid != -1 )
                             {
                                 // Display zone node info
-                                last_nid = msg->u.reply.u.zone_info.node[i].nid;
                                 //      "[%s] ZID PNID State    Name\n", MyName);
                                 //      "[%s] --- ---- -------- --------\n", MyName);
                                 printf ("[%s] %3.3d  %3.3d %-8s %s\n",
@@ -5333,7 +5331,7 @@ int start_process (int *nid, PROCESSTYPE type, char *name, bool debug, int prior
     count = 0;
     while (*cmd_tail && count < MAX_ARGS)
     {
-        cmd_tail = get_token (cmd_tail, token, &delimiter, MAX_TOKEN,
+        cmd_tail = get_token (cmd_tail, token, &delimiter, (MAX_ARG_SIZE - 1),
                               false /* equal is not a delim */);
         strncpy (msg->u.request.u.new_process.argv[count], token,
                  MAX_ARG_SIZE - 1);
@@ -7309,6 +7307,9 @@ void node_cmd (char *cmd_tail)
                     sprintf( msgString, "[%s] Node delete is not available with Virtual Nodes!",MyName);
                     write_startup_log( msgString );
                     printf ("%s\n", msgString);
+                }
+                else
+                {
                     if (ElasticityEnabled)
                     {
                         // <nid> | <node-name>
@@ -8152,7 +8153,6 @@ void persist_exec_cmd( char *cmd )
     const char method_name[] = "persist_exec_cmd";
     char *cmd_tail = cmd;
     char delimiter;
-    char *ptr;
     char token[MAX_TOKEN];
     CPersistConfig *persistConfig;
 
@@ -8181,7 +8181,7 @@ void persist_exec_cmd( char *cmd )
     if (ClusterConfig.IsConfigReady())
     {
         // Parse cmd to get persist-process-prefix
-        ptr = get_token (cmd_tail, token, &delimiter);
+        get_token (cmd_tail, token, &delimiter);
         if (*token != '\0')
         {
             // Get persist process configuration
@@ -8266,7 +8266,6 @@ void persist_kill_cmd( char *cmd )
     const char method_name[] = "persist_kill_cmd";
     char *cmd_tail = cmd;
     char delimiter;
-    char *ptr;
     char token[MAX_TOKEN];
     CPersistConfig *persistConfig;
 
@@ -8286,7 +8285,7 @@ void persist_kill_cmd( char *cmd )
     if (ClusterConfig.IsConfigReady())
     {
         // Parse cmd to get persist-process-prefix
-        ptr = get_token (cmd_tail, token, &delimiter);
+        get_token (cmd_tail, token, &delimiter);
         if (*token != '\0')
         {
             // Get persist process configuration

http://git-wip-us.apache.org/repos/asf/trafodion/blob/3931a75f/core/sqf/monitor/linux/system.cxx
----------------------------------------------------------------------
diff --git a/core/sqf/monitor/linux/system.cxx b/core/sqf/monitor/linux/system.cxx
index ca4b968..49b9878 100644
--- a/core/sqf/monitor/linux/system.cxx
+++ b/core/sqf/monitor/linux/system.cxx
@@ -586,7 +586,7 @@ void CUtility::GetOutput( char *buf, int bufSize )
             bytes = (size <= (bufSize - count)) ? size : (bufSize - count);
             count =+ bytes;
             memcpy( ptr, str.data(), bytes );
-            ptr =+ (char *)bytes;
+            for (int i=0; i<bytes ;ptr++ );
         }
     }
 

http://git-wip-us.apache.org/repos/asf/trafodion/blob/3931a75f/core/sqf/monitor/linux/tmsync.cxx
----------------------------------------------------------------------
diff --git a/core/sqf/monitor/linux/tmsync.cxx b/core/sqf/monitor/linux/tmsync.cxx
index b87f0f4..e6e3a76 100644
--- a/core/sqf/monitor/linux/tmsync.cxx
+++ b/core/sqf/monitor/linux/tmsync.cxx
@@ -42,7 +42,9 @@ using namespace std;
 #include "tmsync.h"
 #include "mlio.h"
 #include "reqqueue.h"
+#include "nameserver.h"
 
+extern bool NameServerEnabled;
 extern int trace_level;
 extern int MyPNID;
 extern sigset_t SigSet;
@@ -926,6 +928,16 @@ void CTmSync_Container::SendUnsolicitedMessages (void)
                 // Get the TM that initiated the sync request
                 tm = LNode[req->Nid]->GetProcessLByType( ProcessType_DTM );
             }
+            if (!tm && NameServerEnabled)
+            {
+                if (trace_settings & (TRACE_INIT | TRACE_RECOVERY | TRACE_REQUEST | TRACE_SYNC | TRACE_TMSYNC))
+                {
+                    trace_printf( "%s@%d - Getting process from Name Server, nid=%d, type=ProcessType_DTM\n"
+                                , method_name, __LINE__, req->Nid );
+                }
+            
+                tm = Nodes->GetProcessLByTypeNs( req->Nid, ProcessType_DTM );
+            }
             if ( tm )
             {
                 // send all TmSync requests data to the local TM processes

http://git-wip-us.apache.org/repos/asf/trafodion/blob/3931a75f/core/sqf/monitor/linux/zclient.cxx
----------------------------------------------------------------------
diff --git a/core/sqf/monitor/linux/zclient.cxx b/core/sqf/monitor/linux/zclient.cxx
index 0ca03b1..d2cd0be 100644
--- a/core/sqf/monitor/linux/zclient.cxx
+++ b/core/sqf/monitor/linux/zclient.cxx
@@ -577,12 +577,19 @@ const char* CZClient::WaitForAndReturnMaster( bool doWait )
     {
         if (trace_settings & (TRACE_INIT | TRACE_RECOVERY))
         {
-            trace_printf( "%s@%d (MasterMonitor) Master Monitor found (%s)\n"
-                        , method_name, __LINE__, masterMonitor.c_str() );
+            trace_printf( "%s@%d (MasterMonitor) Master Monitor found (%s/%s)\n"
+                        , method_name, __LINE__, masterMonitor.c_str(), nodes.data[0] );
         }
         TRACE_EXIT;
         return nodes.data[0];
     }
+    else
+    {
+      if (trace_settings & (TRACE_INIT | TRACE_RECOVERY))
+        {
+            trace_printf( "%s@%d (MasterMonitor) Master Monitor NOT found\n" , method_name, __LINE__);
+        }
+    }
 
     TRACE_EXIT;
     return NULL;
@@ -734,6 +741,59 @@ int CZClient::GetZNodeData( string &monZnode, string &nodeName, int &pnid )
     return( rc );
 }
 
+void CZClient::HandleMasterZNode ( void )
+{
+     const char method_name[] = "CZClient::HandleMasterZNode";
+    TRACE_ENTRY;
+
+    char  pathStr[MAX_PROCESSOR_NAME] = { 0 };
+    char  nodeName[MAX_PROCESSOR_NAME] = { 0 };
+    char *tkn = NULL;
+    char *tknStart = pathStr;
+    char *tknLast = NULL;
+    string monZnode;
+    
+    monZnode.assign( znodeQueue_.front() );
+
+    if (trace_settings & (TRACE_INIT | TRACE_RECOVERY))
+    {
+        trace_printf("%s@%d" " - znodePath=%s, znodeQueue_.size=%ld\n"
+                        , method_name, __LINE__
+                        , monZnode.c_str(), znodeQueue_.size() );
+    }
+
+    znodeQueue_.pop_front();
+       
+    strcpy( pathStr, monZnode.c_str() );
+    tknStart++; // skip the first '/'
+    tkn = strtok( tknStart, "/" );
+    do
+    {
+        tknLast = tkn;
+        tkn = strtok( NULL, "/" );
+    }
+    while( tkn != NULL );
+    
+    strcpy( nodeName, tknLast );
+    if (trace_settings & (TRACE_INIT | TRACE_RECOVERY))
+    {
+        trace_printf( "%s@%d nodeName=%s\n"
+                    , method_name, __LINE__
+                    , strlen(nodeName) ? nodeName : "" );
+    }
+       
+    string masterpath = zkRootNode_ + zkRootNodeInstance_ + ZCLIENT_MASTER_ZNODE;
+    std::size_t found = monZnode.find(masterpath);
+    // if it is the master node, then call HandleAssignMonitorLeader
+    if (found!=std::string::npos)
+    // zookeeper node, assume stale
+    {
+        HandleAssignMonitorLeader(nodeName);
+    }
+    
+    TRACE_EXIT; 
+}
+
 void CZClient::HandleExpiredZNode( void )
 {
     const char method_name[] = "CZClient::HandleExpiredZNode";
@@ -778,13 +838,23 @@ void CZClient::HandleExpiredZNode( void )
                         , strlen(nodeName) ? nodeName : "" );
         }
 
-        char buf[MON_STRING_BUF_SIZE];
-        snprintf( buf, sizeof(buf)
+        string masterpath = zkRootNode_ + zkRootNodeInstance_ + ZCLIENT_MASTER_ZNODE;
+        std::size_t found = monZnode.find(masterpath);
+        // if it is not the master node, then call HandleNodeExpiration
+        if (found==std::string::npos)
+        {    
+             char buf[MON_STRING_BUF_SIZE];
+             snprintf( buf, sizeof(buf)
                 , "[%s], %s was deleted, handling node (%s) as a down node!\n"
                 ,  method_name, monZnode.c_str(), nodeName );
-        mon_log_write(MON_ZCLIENT_CHECKZNODE_1, SQ_LOG_ERR, buf);
-
-        HandleNodeExpiration( nodeName );
+              mon_log_write(MON_ZCLIENT_CHECKZNODE_1, SQ_LOG_ERR, buf);
+         
+             HandleNodeExpiration( nodeName );
+        }
+        else // zookeeper node, assume stale
+        {
+             HandleAssignMonitorLeader(nodeName);
+        }
     }
     else
     {
@@ -1210,6 +1280,11 @@ void CZClient::MonitorZCluster()
                     HandleExpiredZNode();
                     SetState( ZC_MYZNODE );
                 }
+                // we still need to check if the master went down
+                else
+                {
+                    HandleMasterZNode(); 
+                }
                 break;
             case ZC_STOP:
                 StopClusterMonitoring();
@@ -1580,7 +1655,34 @@ void CZClient::TriggerCheck( int type, const char *znodePath )
                     , ZooConnectionTypeStr( type ) );
     }
 
-    if ( type == ZOO_CREATED_EVENT )
+    // Leader stuff only relevant in agenMode
+    string masterpath = zkRootNode_ + zkRootNodeInstance_ + ZCLIENT_MASTER_ZNODE;
+    std::string monZnode(znodePath);
+    std::size_t found = monZnode.find(masterpath);
+    // if it is not the master node, then call HandleNodeExpiration
+
+    if (found!=std::string::npos)
+    // zookeeper node, assume stale
+    {
+        char  nodeName[MAX_PROCESSOR_NAME] = { 0 };
+        char  tempName[MAX_PROCESSOR_NAME] = { 0 };
+        char *tkn = NULL;
+        const char *tknStart = znodePath;
+        char *tknLast = NULL;
+        tknStart++; // skip the first '/'
+        strcpy (tempName, tknStart);
+        tkn = strtok( tempName, "/" );
+        strcpy (tempName, tknStart);
+        do
+        {
+            tknLast = tkn;
+            tkn = strtok( NULL, "/" );
+        }
+        while( tkn != NULL );
+        strcpy( nodeName, tknLast );
+        HandleAssignMonitorLeader (nodeName);
+    }
+    else if ( type == ZOO_CREATED_EVENT )
     {
         SetState( ZC_ZNODE, znodePath );
     }
@@ -1778,7 +1880,7 @@ int CZClient::WatchNodeMasterDelete( const char *nodeName )
     newpath.str( "" );
     newpath << zkRootNode_.c_str() 
             << zkRootNodeInstance_.c_str() 
-            << ZCLIENT_MASTER_ZNODE
+            << ZCLIENT_MASTER_ZNODE <<"/"
             << nodeName;
            
     string monZnode = newpath.str( );

http://git-wip-us.apache.org/repos/asf/trafodion/blob/3931a75f/core/sqf/monitor/linux/zclient.h
----------------------------------------------------------------------
diff --git a/core/sqf/monitor/linux/zclient.h b/core/sqf/monitor/linux/zclient.h
index 916ed11..22cf730 100644
--- a/core/sqf/monitor/linux/zclient.h
+++ b/core/sqf/monitor/linux/zclient.h
@@ -119,6 +119,7 @@ typedef list<string>    ZNodeList_t;
 //   the nodeName passed in expires.
 extern void HandleMyNodeExpiration( void );
 extern void HandleNodeExpiration( const char *nodeName );
+extern void HandleAssignMonitorLeader ( const char* failedMaster );
 
 class CZClient : public CLock
 {
@@ -168,6 +169,7 @@ private:
     int     GetZNodeData( string &monZnode, string &nodeName, int &pnid );
     ZClientState_t GetState( void ) { CAutoLock lock(getLocker()); return( state_ ); }
     void    HandleExpiredZNode( void );
+    void    HandleMasterZNode ( void );
     int     InitializeZClient( void );
     bool    IsEnabled( void ) { CAutoLock lock(getLocker()); return( enabled_ ); }
     bool    IsCheckCluster( void ) { CAutoLock lock(getLocker()); return( checkCluster_ ); }

http://git-wip-us.apache.org/repos/asf/trafodion/blob/3931a75f/core/sqf/monitor/linux/zootest.cxx
----------------------------------------------------------------------
diff --git a/core/sqf/monitor/linux/zootest.cxx b/core/sqf/monitor/linux/zootest.cxx
index 94d2605..4b7a11f 100644
--- a/core/sqf/monitor/linux/zootest.cxx
+++ b/core/sqf/monitor/linux/zootest.cxx
@@ -55,6 +55,14 @@ int MyPid = -1;
 CZClient    *ZClient = NULL;
 CMonLog     *MonLog =  NULL;
 
+void HandleAssignMonitorLeader ( const char* failedMaster )
+{
+    const char method_name[] = "HandleAssignMonitorLeader";
+    TRACE_ENTRY;
+    failedMaster = failedMaster;
+    TRACE_EXIT;
+}
+
 void HandleMyNodeExpiration( void )
 {
     const char method_name[] = "HandleMyNodeExpiration";

http://git-wip-us.apache.org/repos/asf/trafodion/blob/3931a75f/core/sqf/monitor/test/monitor.env
----------------------------------------------------------------------
diff --git a/core/sqf/monitor/test/monitor.env b/core/sqf/monitor/test/monitor.env
index 52e2341..25029ce 100644
--- a/core/sqf/monitor/test/monitor.env
+++ b/core/sqf/monitor/test/monitor.env
@@ -19,47 +19,28 @@
 #
 # @@@ END COPYRIGHT @@@
 
-# Monitor process run mode:
-#
-#   AGENT - monitor process runs in agent mode versus MPI collective
-#
-# Uncomment the next three environment variables
-#export SQ_MON_RUN_MODE=AGENT
-MONITOR_COMM_PORT=23390
-MONITOR_SYNC_PORT=23380
-#
-#   NAME-SERVER - to disable process replication and enable name-server
-#
-# Uncomment the next six environment variables
-SQ_NAMESERVER_ENABLED=1
-NS_COMM_PORT=23370
-NS_SYNC_PORT=23360
-NS_M2N_COMM_PORT=23350
-MON2MON_COMM_PORT=23340
-MONITOR_COMM_PORT=23330
-
 # Uncomment MON_TRACE_ENABLE and specific tracing level to enable 
 # Trafodion monitor process tracing
-MON_TRACE_ENABLE=1
-MON_TRACE_EVLOG_MSG=1
-MON_TRACE_INIT=1
-MON_TRACE_RECOVERY=1
-MON_TRACE_REQUEST=1
-MON_TRACE_PROCESS=1
-MON_TRACE_NOTICE=1
-MON_TRACE_NS=1
+#MON_TRACE_ENABLE=1
+#MON_TRACE_EVLOG_MSG=1
+#MON_TRACE_INIT=1
+#MON_TRACE_RECOVERY=1
+#MON_TRACE_REQUEST=1
+#MON_TRACE_PROCESS=1
+#MON_TRACE_NOTICE=1
+#MON_TRACE_NS=1
 #MON_TRACE_SYNC=1
 # Enable TC_TRACE_* along with MON_TRACE_TRAFCONFIG for more detail
 #MON_TRACE_TRAFCONFIG=1
 #MON_TRACE_MLIO=1
 
 #MON_TRACE_REQUEST_DETAIL=1
-MON_TRACE_PROCESS_DETAIL=1
+#MON_TRACE_PROCESS_DETAIL=1
 #MON_TRACE_NOTICE_DETAIL=1
 #MON_TRACE_SYNC_DETAIL=1
 #MON_TRACE_MLIO_DETAIL=1
 
-MON_TRACE_MEAS
+#MON_TRACE_MEAS=1
 #MON_TRACE_TMSYNC=1
 #MON_TRACE_STATS=1
 #MON_TRACE_ENTRY_EXIT=1
@@ -78,4 +59,3 @@ MON_TRACE_MEAS
 #TC_TRACE_INIT=1
 #TC_TRACE_LOG_MSG=1
 #TC_TRACE_ENTRY_EXIT=1
-

http://git-wip-us.apache.org/repos/asf/trafodion/blob/3931a75f/core/sqf/monitor/test/sqconfig.monitor.virtual
----------------------------------------------------------------------
diff --git a/core/sqf/monitor/test/sqconfig.monitor.virtual b/core/sqf/monitor/test/sqconfig.monitor.virtual
index 9ab56e8..ad23ec2 100644
--- a/core/sqf/monitor/test/sqconfig.monitor.virtual
+++ b/core/sqf/monitor/test/sqconfig.monitor.virtual
@@ -24,9 +24,5 @@ _virtualnodes 6
 end node
 
 begin name-server
-#nodes=0
-nodes=0,1
-#nodes=0,1,2
-#nodes=0,1,2,3
-#nodes=0,1,2,3,4,5
+nodes=0,1,2,3,4,5
 end name-server

http://git-wip-us.apache.org/repos/asf/trafodion/blob/3931a75f/core/sqf/sqenvcom.sh
----------------------------------------------------------------------
diff --git a/core/sqf/sqenvcom.sh b/core/sqf/sqenvcom.sh
index 59d30cd..84b2e1f 100644
--- a/core/sqf/sqenvcom.sh
+++ b/core/sqf/sqenvcom.sh
@@ -673,31 +673,31 @@ export SQ_LUNMGR_VERBOSITY=1
 # Control SQ default startup behavior (c=cold, w=warm, if removed sqstart will autocheck)
 export SQ_STARTUP=r
 
-# Monitor process creator:
+#
+# NOTE: in a Python installation when SQ_MON_RUN_MODE below
+#       is AGENT the SQ_MON_CREATOR must be MPIRUN
 #
 #   MPIRUN - monitor process is created by mpirun
+#            (meaning that mpirun is the parent process of the monitor process)
+#   AGENT  - monitor process runs in agent mode versus MPI collective
 #
-# Uncomment SQ_MON_CREATOR when running monitor in AGENT mode
+# Uncomment the next four environment variables
 #export SQ_MON_CREATOR=MPIRUN
-
-# Monitor process run mode:
-#
-#   AGENT - monitor process runs in agent mode versus MPI collective
-#
-# Uncomment the next three environment variables
 #export SQ_MON_RUN_MODE=AGENT
 #export MONITOR_COMM_PORT=23390
 #export MONITOR_SYNC_PORT=23380
+
 #
-#   NAME-SERVER - to disable process replication and enable name-server
+#   NAME-SERVER - to disable process replication and enable the name-server
 #
-# Uncomment the next six environment variables
+# Uncomment the next environment variable
 #export SQ_NAMESERVER_ENABLED=1
-#export NS_COMM_PORT=23370
-#export NS_SYNC_PORT=23360
-#export NS_M2N_COMM_PORT=23350
-#export MON2MON_COMM_PORT=23340
-#export MONITOR_COMM_PORT=23330
+if [[ "$SQ_NAMESERVER_ENABLED" == "1" ]]; then
+  export NS_COMM_PORT=23370
+  export NS_SYNC_PORT=23360
+  export NS_M2N_COMM_PORT=23350
+  export MON2MON_COMM_PORT=23340
+fi
 
 # Alternative logging capability in monitor
 export SQ_MON_ALTLOG=0

http://git-wip-us.apache.org/repos/asf/trafodion/blob/3931a75f/core/sqf/sql/scripts/monitor.env
----------------------------------------------------------------------
diff --git a/core/sqf/sql/scripts/monitor.env b/core/sqf/sql/scripts/monitor.env
index 2bfa4a7..25029ce 100644
--- a/core/sqf/sql/scripts/monitor.env
+++ b/core/sqf/sql/scripts/monitor.env
@@ -40,7 +40,7 @@
 #MON_TRACE_SYNC_DETAIL=1
 #MON_TRACE_MLIO_DETAIL=1
 
-#MON_TRACE_MEAS
+#MON_TRACE_MEAS=1
 #MON_TRACE_TMSYNC=1
 #MON_TRACE_STATS=1
 #MON_TRACE_ENTRY_EXIT=1

http://git-wip-us.apache.org/repos/asf/trafodion/blob/3931a75f/core/sqf/sql/scripts/sqconfig
----------------------------------------------------------------------
diff --git a/core/sqf/sql/scripts/sqconfig b/core/sqf/sql/scripts/sqconfig
index a753549..c45896f 100644
--- a/core/sqf/sql/scripts/sqconfig
+++ b/core/sqf/sql/scripts/sqconfig
@@ -23,9 +23,9 @@ begin node
 _virtualnodes 2
 end node
 
-#begin name-server
-#nodes=0,1
-#end name-server
+begin name-server
+nodes=0,1
+end name-server
 
 ###############################################################################
 #

http://git-wip-us.apache.org/repos/asf/trafodion/blob/3931a75f/core/sqf/sql/scripts/sqnameserver.pm
----------------------------------------------------------------------
diff --git a/core/sqf/sql/scripts/sqnameserver.pm b/core/sqf/sql/scripts/sqnameserver.pm
index 96572bd..87a576c 100644
--- a/core/sqf/sql/scripts/sqnameserver.pm
+++ b/core/sqf/sql/scripts/sqnameserver.pm
@@ -97,12 +97,12 @@ sub parseStatement {
         my $eq;
         ($eq, $s) = parseEq($s);
         if ($eq) {
-            while ($s =~ /([A-Za-z0-9-]+)(\s*,\s*)/) {
+            while ($s =~ /([A-Za-z0-9.\-]+)(\s*,\s*)/) {
                 my $nodeName = $1;
                 $s =~ s:$nodeName$2::;
                 push(@g_nodeNames, $nodeName);
             }
-            if ($s =~ /([A-Za-z0-9-]+)/) {
+            if ($s =~ /([A-Za-z0-9.\-]+)/) {
                 my $nodeName = $1;
                 $s =~ s:$nodeName::;
                 push(@g_nodeNames, $nodeName);

http://git-wip-us.apache.org/repos/asf/trafodion/blob/3931a75f/core/sqf/sql/scripts/sqnodes.pm
----------------------------------------------------------------------
diff --git a/core/sqf/sql/scripts/sqnodes.pm b/core/sqf/sql/scripts/sqnodes.pm
index 36d8f0c..0d09565 100644
--- a/core/sqf/sql/scripts/sqnodes.pm
+++ b/core/sqf/sql/scripts/sqnodes.pm
@@ -279,10 +279,10 @@ sub verifyParse
             displayStmt($stmtOk);
             print "   Error: node-id not specified\n";
         }
-        elsif ($nodeId > 255)
+        elsif ($nodeId > 1023)
         {
             displayStmt($stmtOk);
-            print "   Error: node-id must be in the range 0..255.\n";
+            print "   Error: node-id must be in the range 0..1023.\n";
         }
         if (@cores == 0)
         {


Mime
View raw message