trafodion-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sure...@apache.org
Subject [2/6] trafodion git commit: Multiple fixes with Name Server enabled logic. - More than one Name Server is now supported - Node failures handled with with/without Name Server in node - Long process names are now the default and support clusters larger tha
Date Sun, 29 Jul 2018 04:20:09 GMT
http://git-wip-us.apache.org/repos/asf/trafodion/blob/32fe8565/core/sqf/monitor/linux/process.cxx
----------------------------------------------------------------------
diff --git a/core/sqf/monitor/linux/process.cxx b/core/sqf/monitor/linux/process.cxx
index 9b9ee00..a39a589 100644
--- a/core/sqf/monitor/linux/process.cxx
+++ b/core/sqf/monitor/linux/process.cxx
@@ -2589,27 +2589,46 @@ bool CProcess::Create (CProcess *parent, void* tag, int & result)
         // Take fork semaphore.  We need to wait until parent indicates
         // it is ok to proceed.  Pipes between parent and child need to
         // be set up before child can continue.
+        bool sem_log_error = false;
         int sem_rc;
+        int err = 0;
         struct timeval logTime;
         struct tm *ltime;
+        struct timespec ts;
 
-        gettimeofday(&logTime, NULL);
-        ltime = localtime(&logTime.tv_sec);
+        if (clock_gettime(CLOCK_REALTIME, &ts) == -1)
+        {
+            err = errno;
+            gettimeofday(&logTime, NULL);
+            ltime = localtime(&logTime.tv_sec);
+            snprintf(la_buf, sizeof(la_buf),
+                     "%02d/%02d/%02d-%02d:%02d:%02d "
+                     "[CProcess::Create], clock_gettime(CLOCK_REALTIME),"
+                     " Child can't get time, %s (%d), program %s, (pid=%d).\n"
+                     , ltime->tm_mon+1, ltime->tm_mday, ltime->tm_year-100, ltime->tm_hour, ltime->tm_min, ltime->tm_sec
+                     , strerror(err), err
+                     , filename, getpid());
+            write (2, la_buf, strlen(la_buf));
+        }
+        ts.tv_sec  += 1;
 
-        struct timespec ts;
-        ts.tv_sec  = 1;
-        ts.tv_nsec = 0;
         env = getenv( "MON_CREATE_SEM_DELAY" );
         if (env && isdigit(*env))
         {
             ts.tv_sec = atol(env);
         }
-        int err;
+
+        env = getenv( "MON_CREATE_SEM_LOG_ERROR" );
+        if (env && isdigit(*env))
+        {
+            int val = atoi(env);
+            sem_log_error = (val != 0) ? true : false;
+        }
         do
         {
             sem_rc = sem_timedwait(MyNode->GetMutex(), &ts);
             err = errno;
-            if ( err == ETIMEDOUT )
+            if ( sem_log_error && err == ETIMEDOUT )
             {
                 gettimeofday(&logTime, NULL);
                 ltime = localtime(&logTime.tv_sec);
@@ -2625,7 +2644,7 @@ bool CProcess::Create (CProcess *parent, void* tag, int & result)
         }
         while (sem_rc == -1 && (err == EINTR || err == ETIMEDOUT));
 
-        if ( sem_rc == -1 && !(err == EINTR || err == ETIMEDOUT))
+        if ( sem_log_error && sem_rc == -1 && !(err == EINTR || err == ETIMEDOUT))
         {
             gettimeofday(&logTime, NULL);
             ltime = localtime(&logTime.tv_sec);
@@ -3319,6 +3338,10 @@ void CProcess::Exit( CProcess *parent )
             case ProcessType_NameServer:
                 if ( IsAbended() )
                 {
+                    if (!Clone)
+                    {
+                        NameServer->NameServerExited();
+                    }
                     if (trace_settings & (TRACE_SYNC | TRACE_REQUEST | TRACE_PROCESS))
                        trace_printf("%s@%d" " - NameServer abended" "\n", method_name, __LINE__);
                 }
@@ -4095,6 +4118,7 @@ void CProcess::Switch( CProcess *parent )
 CProcessContainer::CProcessContainer (void)
                   :numProcs_(0)
                   ,nodeContainer_(false)
+                  ,processNameFormatLong_(true)
                   ,nameMap_(NULL)
                   ,pidMap_(NULL)
                   ,head_(NULL)
@@ -4121,12 +4145,22 @@ CProcessContainer::CProcessContainer (void)
         abort();
     }
 
+#ifndef NAMESERVER_PROCESS
+    char *env = getenv("SQ_MON_PROCESS_NAME_FORMAT_LONG");
+    if ( env && isdigit(*env) )
+    {
+        int val = atoi(env);
+        processNameFormatLong_ = (val != 0) ? true : false;
+    }
+#endif
+
     TRACE_EXIT;
 }
 
 CProcessContainer::CProcessContainer( bool nodeContainer )
                   :numProcs_(0)
                   ,nodeContainer_(nodeContainer)
+                  ,processNameFormatLong_(true)
                   ,nameMap_(NULL)
                   ,pidMap_(NULL)
                   ,head_(NULL)
@@ -4161,6 +4195,15 @@ CProcessContainer::CProcessContainer( bool nodeContainer )
         abort();
     }
 
+#ifndef NAMESERVER_PROCESS
+    char *env = getenv("SQ_MON_PROCESS_NAME_FORMAT_LONG");
+    if ( env && isdigit(*env) )
+    {
+        int val = atoi(env);
+        processNameFormatLong_ = (val != 0) ? true : false;
+    }
+#endif
+
     if ( nodeContainer_ )
     {
         nameMap_ = new nameMap_t;
@@ -4775,46 +4818,85 @@ void CProcessContainer::Bcast (struct message_def *msg)
 
 char *CProcessContainer::BuildOurName( int nid, int pid, char *name )
 {
-    int i;
-    int rem;
-    int cnt[4];
-
     const char method_name[] = "CProcessContainer::BuildOurName";
     TRACE_ENTRY;
 
-    // Convert Pid into base 35 acsii
-    cnt[0] = pid / 42875;
-    rem = pid - ( cnt[0] * 42875 );
-    cnt[1] = rem / 1225;
-    rem -= ( cnt[1] * 1225 );
-    cnt[2] = rem / 35;
-    rem -= ( cnt[2] * 35 );
-    cnt[3] = rem;
+    int i;
+    int rem;
+    int cnt[6];
+
+    if (!processNameFormatLong_)
+    {
+        // Convert Pid into base 35 acsii
+        cnt[0] = pid / 42875;    // (35 * 35 * 35)
+        rem = pid - ( cnt[0] * 42875 );
+        cnt[1] = rem / 1225;     // (35 * 35)
+        rem -= ( cnt[1] * 1225 );
+        cnt[2] = rem / 35;
+        rem -= ( cnt[2] * 35 );
+        cnt[3] = rem;
+    
+        // Process name format long: '$Zxxpppp' xx = nid, pppp = pid
 
-    // Convert Nid into base 16 acsii
-    sprintf(name,"$Z%2.2X",nid);
-    for(i=3; i>=0; i--)
-    {
-        if( cnt[i] < 10 )
-        {
-            name[i+4] = '0'+cnt[i];
-        }
-        else
+        // Convert Nid into base 16 acsii
+        sprintf(name,"$Z%2.2X",nid);
+
+        // Convert Pid into base 36 ascii
+        for(i=3; i>=0; i--)
         {
-            cnt[i] -= 10;
-            // we are skipping cap 'o' because it looks like zero.
-            if( cnt[i] >= 14 )
+            if( cnt[i] < 10 )
             {
-                name[i+4] = 'P'+(cnt[i]-14);
+                name[i+4] = '0'+cnt[i];
             }
             else
             {
-                name[i+4] = 'A'+cnt[i];
+                cnt[i] -= 10;
+                // we are skipping cap 'o' because it looks like zero.
+                if( cnt[i] >= 14 )
+                {
+                    name[i+4] = 'P'+(cnt[i]-14);
+                }
+                else
+                {
+                    name[i+4] = 'A'+cnt[i];
+                }
             }
         }
+        name[8] = '\0';
+    }
+    else
+    {
+        // We are skipping 'A', 'I', 'O', and 'U' to distinguish between zero
+        // and one digits, and for political correctness in generated names
+        char b32table[32] =  {'0','1','2','3','4','5','6','7','8','9'
+                             ,'B','C','D','E','F','G','H','J','K','L','M'
+                             ,'N','P','Q','R','S','T','V','W','X','Y','Z' };
+    
+        // Convert Pid into base 32 ascii
+        cnt[0] = pid / 33554432;    // (32 * 32 * 32 * 32 * 32)
+        rem = pid - ( cnt[0] * 33554432 );
+        cnt[1] = rem / 1048576;     // (32 * 32 * 32 * 32)
+        rem -= ( cnt[1] * 1048576 );
+        cnt[2] = rem / 32768;       // (32 * 32 * 32)
+        rem -= ( cnt[2] * 32768 );
+        cnt[3] = rem / 1024;        // (32 * 32)
+        rem -= ( cnt[3] * 1024 );
+        cnt[4] = rem / 32;
+        rem -= ( cnt[4] * 32 );
+        cnt[5] = rem;
+    
+        // Process name format long: '$Zxxxxpppppp' xxxx = nid, pppppp = pid
+    
+        // Convert Nid into base 16 ascii
+        sprintf(name,"$Z%4.4X",nid);
+    
+        // Convert Pid into base 32 ascii
+        for(i=5; i>=0; i--)
+        {
+            name[i+6] = static_cast<char>(b32table[cnt[i]]);
+        }
+        name[12] = '\0';
     }
-    name[8] = '\0';
-
 
     TRACE_EXIT;
     return name;
@@ -5398,6 +5480,65 @@ CProcess *CProcessContainer::CreateProcess (CProcess * parent,
 }
 #endif
 
+#ifdef NAMESERVER_PROCESS
+void CProcessContainer::DeleteAllDown()
+{
+    CProcess *process  = NULL;
+    int nid = -1;
+    int pid = -1;
+
+    const char method_name[] = "CProcessContainer::DeleteAllDown";
+    TRACE_ENTRY;
+
+    nameMap_t::iterator nameMapIt;
+
+    while ( true )
+    {
+        nameMapLock_.lock();
+        nameMapIt = nameMap_->begin();
+
+        if (nameMap_->size() == 0)
+        {
+            nameMapLock_.unlock();
+            break; // all done
+        }
+
+        process = nameMapIt->second;
+
+        // Delete name map entry
+        nameMap_->erase (nameMapIt);
+
+        nameMapLock_.unlock();
+
+        nid = process->GetNid();
+        pid = process->GetPid();
+
+        if (trace_settings & (TRACE_PROCESS | TRACE_PROCESS_DETAIL))
+        {
+            trace_printf("%s@%d removed from nameMap %p: %s (%d, %d)\n",
+                         method_name, __LINE__, nameMap_,
+                         process->GetName(), nid, pid);
+        }
+
+        // Delete pid map entry
+        DelFromPidMap ( process );
+
+        if (trace_settings & (TRACE_SYNC | TRACE_REQUEST | TRACE_PROCESS))
+        {
+            trace_printf( "%s@%d - Completed delete for %s (%d, %d)\n"
+                        , method_name, __LINE__
+                        , process->GetName(), nid, pid);
+        }
+
+        // Remove all processes
+        // PSD will re-create persistent processes on spare node activation
+        Exit_Process( process, true, nid );
+    }
+
+    TRACE_EXIT;
+}
+#endif
+
 void CProcessContainer::DeleteFromList( CProcess *process )
 {
     const char method_name[] = "CProcessContainer::DeleteFromList";
@@ -7086,7 +7227,9 @@ void CProcessContainer::SetProcessState( CProcess *process, STATE state, bool ab
 
                 // Note: Exit_Process() will delete the process object, so
                 //       save the process information needed before the call
+#ifndef NAMESERVER_PROCESS
                 PROCESSTYPE processType = process->GetType();
+#endif
                 string      processName = process->GetName();
                 int         processNid  = process->GetNid();
                 int         processPid  = process->GetPid();
@@ -7101,6 +7244,7 @@ void CProcessContainer::SetProcessState( CProcess *process, STATE state, bool ab
                                 , processName.c_str(), processNid, processPid, processVerifier
                                 , abend, downNode
                                 , MyNode->IsKillingNode(), MyNode->IsDTMAborted(), MyNode->IsSMSAborted());
+#ifndef NAMESERVER_PROCESS
                 if ( !MyNode->IsKillingNode() )
                 {
                     switch ( processType )
@@ -7147,6 +7291,7 @@ void CProcessContainer::SetProcessState( CProcess *process, STATE state, bool ab
                         break;
                     }
                 }
+#endif
             }
             break;
         default:

http://git-wip-us.apache.org/repos/asf/trafodion/blob/32fe8565/core/sqf/monitor/linux/process.h
----------------------------------------------------------------------
diff --git a/core/sqf/monitor/linux/process.h b/core/sqf/monitor/linux/process.h
index 3cde3e5..736ddcc 100644
--- a/core/sqf/monitor/linux/process.h
+++ b/core/sqf/monitor/linux/process.h
@@ -139,6 +139,9 @@ class CProcessContainer
                              , void *tag
                              , int & result
                              );
+#ifdef NAMESERVER_PROCESS
+    void DeleteAllDown();
+#endif
     bool Dump_Process( CProcess *dumper, CProcess *process, char *core_path );
     void DumpCallback( int nid, pid_t pid, int status );
     void Exit_Process( CProcess *process, bool abend, int downNode );
@@ -185,10 +188,12 @@ protected:
     inline void SetNumProcs( int numProcs ) { numProcs_ = numProcs; };
 
 private:
-    int       numProcs_; // Number of processes in container
+    int    numProcs_; // Number of processes in container
     sem_t *Mutex;
 
-    bool      nodeContainer_;  // true when physical node process container
+    bool   nodeContainer_;  // true when physical node process container
+    bool   processNameFormatLong_; // when true process name format is: 
+                                   // '$Zxxxxpppppp' xxxx = nid, pppppp = pid
     nameMap_t *nameMap_;
     pidMap_t *pidMap_;
     CLock pidMapLock_;

http://git-wip-us.apache.org/repos/asf/trafodion/blob/32fe8565/core/sqf/monitor/linux/ptpclient.cxx
----------------------------------------------------------------------
diff --git a/core/sqf/monitor/linux/ptpclient.cxx b/core/sqf/monitor/linux/ptpclient.cxx
index a88e2d2..39e4443 100644
--- a/core/sqf/monitor/linux/ptpclient.cxx
+++ b/core/sqf/monitor/linux/ptpclient.cxx
@@ -57,9 +57,13 @@ extern CNode *MyNode;
 extern CNodeContainer *Nodes;
 extern bool IsRealCluster;
 extern CMeas Meas;
+extern int MyPNID;
+
+#define MON2MON_IO_RETRIES 3
 
 CPtpClient::CPtpClient (void)
-          : ptpSock_(0)
+          : ptpCommPort_(0)
+          , ptpClusterSocks_(NULL)
           , seqNum_(0)
 {
     const char method_name[] = "CPtpClient::CPtpClient";
@@ -72,11 +76,10 @@ CPtpClient::CPtpClient (void)
         SetLocalHost();
     }
     
-
-    char * p = getenv( "MON2MON_COMM_PORT" );
-    if ( p ) 
+    char * env  = getenv( "MON2MON_COMM_PORT" );
+    if ( env  ) 
     {
-        basePort_ = atoi( p );
+        ptpCommPort_ = atoi( env  );
     }
     else
     {
@@ -88,6 +91,12 @@ CPtpClient::CPtpClient (void)
         abort();
     }
 
+    ptpClusterSocks_ = new int[MAX_NODES];
+    for (int i=0; i < MAX_NODES; ++i)
+    {
+        ptpClusterSocks_[i] = -1;
+    }
+
     TRACE_EXIT;
 }
 
@@ -96,18 +105,84 @@ CPtpClient::~CPtpClient (void)
     const char method_name[] = "CPtpClient::~CPtpClient";
     TRACE_ENTRY;
 
+    delete [] ptpClusterSocks_;
+
     TRACE_EXIT;
 }
 
-int  CPtpClient::AddUniqStr( int nid
-                           , int id
-                           , const char *stringValue
-                           , int targetNid
-                           , const char *targetNodeName )
+int CPtpClient::InitializePtpClient( int pnid, char * ptpPort )
+{
+    const char method_name[] = "CPtpClient::InitializePtpClient";
+    TRACE_ENTRY;
+    int err = 0;
+
+    if (ptpClusterSocks_[pnid] == -1)
+    {
+        int sock = Monitor->MkCltSock( ptpPort );                
+        if (sock < 0)
+        {
+            err = sock;
+            if (trace_settings & (TRACE_REQUEST | TRACE_PROCESS))
+            {
+                trace_printf( "%s@%d - MkCltSock failed with error %d\n"
+                            , method_name, __LINE__, err );
+            }
+        }
+        else
+        {
+            ptpClusterSocks_[pnid] = sock;
+            if (trace_settings & (TRACE_REQUEST | TRACE_PROCESS))
+            {
+                trace_printf( "%s@%d - connected to monitor node=%d(%s), sock=%d, "
+                              "ptpClusterSocks_[%d]=%d\n"
+                            , method_name, __LINE__
+                            , pnid
+                            , ptpPort
+                            , sock
+                            , pnid
+                            , ptpClusterSocks_[pnid] );
+            }
+        }
+    }
+
+    TRACE_EXIT;
+    return err;
+}
+
+bool CPtpClient::IsTargetRemote( int targetNid )
+{
+    const char method_name[] = "CPtpClient::IsTargetRemote";
+    TRACE_ENTRY;
+
+    CLNode *targetLNode = Nodes->GetLNode( targetNid );
+    CNode *targetNode = targetLNode->GetNode();
+    bool rs = (targetNode && targetNode->GetPNid() == MyPNID) ? false : true ;
+
+    TRACE_EXIT;
+    return(rs);
+}
+
+int  CPtpClient::ProcessAddUniqStr( int nid
+                                  , int id
+                                  , const char *stringValue
+                                  , int targetNid
+                                  , const char *targetNodeName )
 {
-    const char method_name[] = "CPtpClient::AddUniqStr";
+    const char method_name[] = "CPtpClient::ProcessAddUniqStr";
     TRACE_ENTRY;
 
+    if (!IsTargetRemote( targetNid ))
+    {
+        if (trace_settings & (TRACE_REQUEST | TRACE_PROCESS))
+        {
+            trace_printf( "%s@%d - Not Sending InternalType_UniqStr request to "
+                          "local nid=%d\n"
+                        , method_name, __LINE__
+                        , targetNid );
+        }
+        return(0);
+    }
+
     if (trace_settings & (TRACE_REQUEST | TRACE_PROCESS))
     {
         trace_printf( "%s@%d - Sending InternalType_UniqStr request to %s, "
@@ -129,59 +204,32 @@ int  CPtpClient::AddUniqStr( int nid
     // Copy the string
     memcpy( stringData, stringValue, stringDataLen );
 
-    int size = offsetof(struct internal_msg_def, u);
-    size += sizeof(msg.u.uniqstr);
-    size += stringDataLen;
-    
+    ptpMsgInfo_t myInfo;
+    myInfo.pnid = MyPNID;
+    myInfo.size = offsetof(struct internal_msg_def, u);
+    myInfo.size += sizeof(msg.u.uniqstr);
+    myInfo.size += stringDataLen;
+
     if (trace_settings & TRACE_PROCESS_DETAIL)
     {
         trace_printf( "%s@%d - size_=%d, forwarding unique string [%d, %d] (%s)\n"
                     , method_name, __LINE__
-                    , size
+                    , myInfo.size
                     , msg.u.uniqstr.nid
                     , msg.u.uniqstr.id
                     , &msg.u.uniqstr.valueData  );
     }
 
-    int error = SendToMon("add-unique-string", &msg, size, targetNid, targetNodeName);
+    int error = SendToMon( "process-add-unique-string"
+                         , &msg
+                         , myInfo
+                         , targetNid
+                         , targetNodeName);
     
     TRACE_EXIT;
     return error;
 }
 
-int CPtpClient::InitializePtpClient( char * ptpPort )
-{
-    const char method_name[] = "CPtpClient::InitializePtpClient";
-    TRACE_ENTRY;
-    int err = 0;
-      
-    int sock = Monitor->MkCltSock( ptpPort );                
-    if (sock < 0)
-    {
-        err = sock;
-        
-        if (trace_settings & (TRACE_REQUEST | TRACE_PROCESS))
-        {
-            trace_printf( "%s@%d - MkCltSock failed with error %d\n"
-                        , method_name, __LINE__, err );
-        }
-    }
-    else
-    {
-        ptpSock_ = sock;
-        if (trace_settings & (TRACE_REQUEST | TRACE_PROCESS))
-        {
-            trace_printf( "%s@%d - connected to monitor node=%s, sock=%d\n"
-                        , method_name, __LINE__
-                        , ptpPort
-                        , ptpSock_ );
-        }
-    }
-
-    TRACE_EXIT;
-    return err;
-}
-
 int CPtpClient::ProcessClone( CProcess *process )
 {
     const char method_name[] = "CPtpClient::ProcessClone";
@@ -209,6 +257,18 @@ int CPtpClient::ProcessClone( CProcess *process )
         return(0);
     }
 
+    if (!IsTargetRemote( process->GetParentNid() ))
+    {
+        if (trace_settings & (TRACE_REQUEST | TRACE_PROCESS))
+        {
+            trace_printf( "%s@%d - Not Sending InternalType_Clone request to "
+                          "local nid=%d\n"
+                        , method_name, __LINE__
+                        , process->GetParentNid() );
+        }
+        return(0);
+    }
+
     if (trace_settings & (TRACE_REQUEST | TRACE_PROCESS))
     {
         trace_printf( "%s@%d - Sending InternalType_Clone request to %s, parentNid=%d"
@@ -281,13 +341,15 @@ int CPtpClient::ProcessClone( CProcess *process )
     msg.u.clone.argvLen =  argvLen;
     memcpy( stringData, process->userArgv(), argvLen );
 
-    int size = offsetof(struct internal_msg_def, u);
-    size += sizeof(msg.u.clone);
-    size += nameLen ;
-    size += portLen ;
-    size += infileLen ;
-    size += outfileLen ;
-    size += argvLen ;
+    ptpMsgInfo_t myInfo;
+    myInfo.pnid = MyPNID;
+    myInfo.size = offsetof(struct internal_msg_def, u);
+    myInfo.size += sizeof(msg.u.clone);
+    myInfo.size += nameLen ;
+    myInfo.size += portLen ;
+    myInfo.size += infileLen ;
+    myInfo.size += outfileLen ;
+    myInfo.size += argvLen ;
     
     if (trace_settings & TRACE_PROCESS_DETAIL)
     {
@@ -299,7 +361,7 @@ int CPtpClient::ProcessClone( CProcess *process )
                       "outfile=%s, strlen(outfile)=%d, "
                       "argc=%d, strlen(total argv)=%d, args=[%.*s]\n"
                     , method_name, __LINE__
-                    , size
+                    , myInfo.size
                     , msg.u.clone.programStrId.nid
                     , msg.u.clone.programStrId.id
                     , msg.u.clone.pathStrId.nid
@@ -322,7 +384,7 @@ int CPtpClient::ProcessClone( CProcess *process )
 
     int error = SendToMon( "process-clone"
                          , &msg
-                         , size
+                         , myInfo
                          , process->GetParentNid()
                          , parentLNode->GetNode()->GetName());
     
@@ -337,6 +399,18 @@ int CPtpClient::ProcessExit( CProcess *process
     const char method_name[] = "CPtpClient::ProcessExit";
     TRACE_ENTRY;
 
+    if (!IsTargetRemote( targetNid ))
+    {
+        if (trace_settings & (TRACE_REQUEST | TRACE_PROCESS))
+        {
+            trace_printf( "%s@%d - Not Sending InternalType_Exit request to "
+                          "local nid=%d\n"
+                        , method_name, __LINE__
+                        , targetNid );
+        }
+        return(0);
+    }
+
     if (trace_settings & (TRACE_REQUEST | TRACE_PROCESS))
     {
         trace_printf( "%s@%d - Sending InternalType_Exit request to %s, targetNid=%d"
@@ -359,15 +433,17 @@ int CPtpClient::ProcessExit( CProcess *process
     strcpy(msg.u.exit.name, process->GetName());
     msg.u.exit.abended = process->IsAbended();
 
-    int size = offsetof(struct internal_msg_def, u);
-    size += sizeof(msg.u.exit);
+    ptpMsgInfo_t myInfo;
+    myInfo.pnid = MyPNID;
+    myInfo.size = offsetof(struct internal_msg_def, u);
+    myInfo.size += sizeof(msg.u.exit);
     
     if (trace_settings & TRACE_PROCESS_DETAIL)
     {
         trace_printf( "%s@%d - size_=%d, process %s (%d,%d:%d) "
                       "abended=%d\n"
                     , method_name, __LINE__
-                    , size
+                    , myInfo.size
                     , msg.u.exit.name
                     , msg.u.exit.nid
                     , msg.u.exit.pid
@@ -375,7 +451,11 @@ int CPtpClient::ProcessExit( CProcess *process
                     , msg.u.exit.abended );
     }
 
-    int error = SendToMon("process-exit", &msg, size, targetNid, targetNodeName);
+    int error = SendToMon( "process-exit"
+                         , &msg
+                         , myInfo
+                         , targetNid
+                         , targetNodeName);
     
     TRACE_EXIT;
     return error;
@@ -411,6 +491,18 @@ int CPtpClient::ProcessInit( CProcess *process
         return(0);
     }
 
+    if (!IsTargetRemote( process->GetParentNid() ))
+    {
+        if (trace_settings & (TRACE_REQUEST | TRACE_PROCESS))
+        {
+            trace_printf( "%s@%d - Not Sending InternalType_ProcessInit request to "
+                          "local nid=%d\n"
+                        , method_name, __LINE__
+                        , process->GetParentNid() );
+        }
+        return(0);
+    }
+
     if (trace_settings & (TRACE_REQUEST | TRACE_PROCESS))
     {
         trace_printf( "%s@%d" " - Sending InternalType_ProcessInit to parent node %s, parentNid=%d"
@@ -438,12 +530,14 @@ int CPtpClient::ProcessInit( CProcess *process
     msg.u.processInit.tag = tag;
     msg.u.processInit.origNid = process->GetParentNid();
     
-    int size = offsetof(struct internal_msg_def, u);
-    size += sizeof(msg.u.processInit);
+    ptpMsgInfo_t myInfo;
+    myInfo.pnid = MyPNID;
+    myInfo.size = offsetof(struct internal_msg_def, u);
+    myInfo.size += sizeof(msg.u.processInit);
     
     int error = SendToMon( "process-init"
                          , &msg
-                         , size
+                         , myInfo
                          , parentNid
                          , parentLNode->GetNode()->GetName() );
     
@@ -460,6 +554,18 @@ int CPtpClient::ProcessKill( CProcess *process
     const char method_name[] = "CPtpClient::ProcessKill";
     TRACE_ENTRY;
 
+    if (!IsTargetRemote( targetNid ))
+    {
+        if (trace_settings & (TRACE_REQUEST | TRACE_PROCESS))
+        {
+            trace_printf( "%s@%d - Not Sending InternalType_Kill request to "
+                          "local nid=%d\n"
+                        , method_name, __LINE__
+                        , targetNid );
+        }
+        return(0);
+    }
+
     if (trace_settings & (TRACE_REQUEST | TRACE_PROCESS))
     {
         trace_printf( "%s@%d - Sending InternalType_Kill request to %s, targetNid=%d"
@@ -480,22 +586,28 @@ int CPtpClient::ProcessKill( CProcess *process
     msg.u.kill.verifier = process->GetVerifier();
     msg.u.kill.persistent_abort = abort;
 
-    int size = offsetof(struct internal_msg_def, u);
-    size += sizeof(msg.u.exit);
+    ptpMsgInfo_t myInfo;
+    myInfo.pnid = MyPNID;
+    myInfo.size = offsetof(struct internal_msg_def, u);
+    myInfo.size += sizeof(msg.u.exit);
     
     if (trace_settings & TRACE_PROCESS_DETAIL)
     {
         trace_printf( "%s@%d - size_=%d, process (%d,%d:%d) "
                       "persistent_abort=%d\n"
                     , method_name, __LINE__
-                    , size
+                    , myInfo.size
                     , msg.u.kill.nid
                     , msg.u.kill.pid
                     , msg.u.kill.verifier
                     , msg.u.kill.persistent_abort );
     }
 
-    int error = SendToMon("process-kill", &msg, size, targetNid, targetNodeName);
+    int error = SendToMon( "process-kill"
+                         , &msg
+                         , myInfo
+                         , targetNid
+                         , targetNodeName);
     
     TRACE_EXIT;
     return error;
@@ -508,6 +620,18 @@ int CPtpClient::ProcessNew( CProcess *process
     const char method_name[] = "CPtpClient::ProcessNew";
     TRACE_ENTRY;
 
+    if (!IsTargetRemote( targetNid ))
+    {
+        if (trace_settings & (TRACE_REQUEST | TRACE_PROCESS))
+        {
+            trace_printf( "%s@%d - Not Sending InternalType_Process request to "
+                          "local nid=%d\n"
+                        , method_name, __LINE__
+                        , targetNid );
+        }
+        return(0);
+    }
+
     if (trace_settings & (TRACE_REQUEST | TRACE_PROCESS))
     {
         trace_printf( "%s@%d - Sending InternalType_Process request to %s, targetNid=%d"
@@ -567,12 +691,14 @@ int CPtpClient::ProcessNew( CProcess *process
     msg.u.process.argvLen =  argvLen;
     memcpy( stringData, process->userArgv(), argvLen );
 
-    int size = offsetof(struct internal_msg_def, u);
-    size += sizeof(msg.u.process);
-    size += nameLen ;
-    size += infileLen ;
-    size += outfileLen ;
-    size += argvLen ;
+    ptpMsgInfo_t myInfo;
+    myInfo.pnid = MyPNID;
+    myInfo.size = offsetof(struct internal_msg_def, u);
+    myInfo.size += sizeof(msg.u.process);
+    myInfo.size += nameLen ;
+    myInfo.size += infileLen ;
+    myInfo.size += outfileLen ;
+    myInfo.size += argvLen ;
     
     if (trace_settings & TRACE_PROCESS_DETAIL)
     {
@@ -583,7 +709,7 @@ int CPtpClient::ProcessNew( CProcess *process
                       "outfile=%s, strlen(outfile)=%d, "
                       "argc=%d, strlen(total argv)=%d, args=[%.*s]\n"
                     , method_name, __LINE__
-                    , size
+                    , myInfo.size
                     , msg.u.process.programStrId.nid
                     , msg.u.process.programStrId.id
                     , msg.u.process.pathStrId.nid
@@ -602,7 +728,11 @@ int CPtpClient::ProcessNew( CProcess *process
                     , &msg.u.process.stringData+nameLen+infileLen+outfileLen);
     }
 
-    int error = SendToMon("process-new", &msg, size, targetNid, targetNodeName);
+    int error = SendToMon( "process-new"
+                         , &msg
+                         , myInfo
+                         , targetNid
+                         , targetNodeName);
     
     TRACE_EXIT;
     return error;
@@ -620,6 +750,18 @@ int CPtpClient::ProcessNotify( int nid
     const char method_name[] = "CPtpClient::ProcessNotify";
     TRACE_ENTRY;
 
+    if (!IsTargetRemote( targetNid ))
+    {
+        if (trace_settings & (TRACE_REQUEST | TRACE_PROCESS))
+        {
+            trace_printf( "%s@%d - Not Sending InternalType_Notify request to "
+                          "local nid=%d\n"
+                        , method_name, __LINE__
+                        , targetNid );
+        }
+        return(0);
+    }
+
     if (trace_settings & (TRACE_REQUEST | TRACE_PROCESS))
     {
         trace_printf( "%s@%d - Sending InternalType_Notify request to %s"
@@ -682,174 +824,224 @@ int CPtpClient::ProcessNotify( int nid
         }
     }
 
-    int size = offsetof(struct internal_msg_def, u);
-    size += sizeof(msg.u.notify);
+    ptpMsgInfo_t myInfo;
+    myInfo.pnid = MyPNID;
+    myInfo.size = offsetof(struct internal_msg_def, u);
+    myInfo.size += sizeof(msg.u.notify);
 
-    int error = SendToMon("process-notify", &msg, size, targetNid, targetNodeName);
+    int error = SendToMon( "process-notify"
+                         , &msg
+                         , myInfo
+                         , targetNid
+                         , targetNodeName);
     
     TRACE_EXIT;
     return error;
 }
 
-int CPtpClient::ReceiveSock(char *buf, int size, int sockFd)
+int CPtpClient::ProcessStdInReq( int nid
+                               , int pid
+                               , StdinReqType type
+                               , int supplierNid
+                               , int supplierPid )
 {
-    const char method_name[] = "CPtpClient::ReceiveSock";
+    const char method_name[] = "CPtpClient::ProcessStdInReq";
     TRACE_ENTRY;
 
-    bool    readAgain = false;
-    int     error = 0;
-    int     readCount = 0;
-    int     received = 0;
-    int     sizeCount = size;
-       
-    do
+    if (trace_settings & (TRACE_REDIRECTION | TRACE_PROCESS))
     {
-        readCount = (int) recv( sockFd
-                              , buf
-                              , sizeCount
-                              , 0 );
-        if ( readCount > 0 ) Meas.addSockPtpRcvdBytes( readCount );
+        trace_printf( "%s@%d - Sending InternalType_StdinReq request type =%d "
+                      "from (%d,%d), for supplier (%d,%d)\n"
+                    , method_name, __LINE__
+                    , type
+                    , nid
+                    , pid
+                    , supplierNid
+                    , supplierPid );
+    }
 
-        if (trace_settings & (TRACE_REQUEST | TRACE_INIT | TRACE_RECOVERY))
-        {
-            trace_printf( "%s@%d - Count read %d = recv(%d)\n"
-                        , method_name, __LINE__
-                        , readCount
-                        , sizeCount );
-        }
-    
-        if ( readCount > 0 )
-        { // Got data
-            received += readCount;
-            buf += readCount;
-            if ( received == size )
-            {
-                readAgain = false;
-            }
-            else
-            {
-                sizeCount -= readCount;
-                readAgain = true;
-            }
-        }
-        else if ( readCount == 0 )
-        { // EOF
-             error = ENODATA;
-             readAgain = false;
-        }
-        else
-        { // Got an error
-            if ( errno != EINTR)
-            {
-                error = errno;
-                readAgain = false;
-            }
-            else
-            {
-                readAgain = true;
-            }
-        }
+    CLNode  *lnode = Nodes->GetLNode( supplierNid );
+    if (lnode == NULL)
+    {
+        char buf[MON_STRING_BUF_SIZE];
+        snprintf( buf, sizeof(buf)
+                , "[%s], Can't find supplier node nid=%d "
+                  "for stdin data request.\n"
+                , method_name
+                , supplierNid );
+        mon_log_write(PTPCLIENT_STDINREQ_1, SQ_LOG_ERR, buf);
+
+        TRACE_EXIT;
+        return -1;
     }
-    while( readAgain );
 
-    if (trace_settings & (TRACE_REQUEST | TRACE_INIT | TRACE_RECOVERY))
+    CProcess *process = lnode->GetProcessL( supplierPid );
+    if (process == NULL)
     {
-        trace_printf( "%s@%d - recv(), received=%d, error=%d(%s)\n"
+        char buf[MON_STRING_BUF_SIZE];
+        snprintf( buf, sizeof(buf)
+                , "[%s], Can't find process nid=%d, "
+                  "pid=%d for stdin data request.\n"
+                , method_name
+                , supplierNid
+                , supplierPid );
+        mon_log_write(PTPCLIENT_STDINREQ_2, SQ_LOG_ERR, buf);
+
+        TRACE_EXIT;
+        return -1;
+    }
+
+    struct internal_msg_def msg;
+    memset(&msg, 0, sizeof(msg)); 
+    msg.type = InternalType_StdinReq;
+    msg.u.stdin_req.nid = nid;
+    msg.u.stdin_req.pid = pid;
+    msg.u.stdin_req.reqType = type;
+    msg.u.stdin_req.supplier_nid = supplierNid;
+    msg.u.stdin_req.supplier_pid = supplierPid;
+
+    ptpMsgInfo_t myInfo;
+    myInfo.pnid = MyPNID;
+    myInfo.size = offsetof(struct internal_msg_def, u);
+    myInfo.size += sizeof(msg.u.stdin_req);
+    
+    if (trace_settings & (TRACE_REDIRECTION | TRACE_PROCESS_DETAIL))
+    {
+        trace_printf( "%s@%d - size_=%d, type =%d "
+                      "from (%d,%d), for supplier (%d,%d)\n"
                     , method_name, __LINE__
-                    , received
-                    , error, strerror(error) );
+                    , myInfo.size
+                    , msg.u.stdin_req.reqType
+                    , msg.u.stdin_req.nid
+                    , msg.u.stdin_req.pid
+                    , msg.u.stdin_req.supplier_nid
+                    , msg.u.stdin_req.supplier_pid );
     }
 
+    int error = SendToMon( "process-stdin"
+                         , &msg
+                         , myInfo
+                         , process->GetNid()
+                         , lnode->GetNode()->GetName());
+    
     TRACE_EXIT;
     return error;
 }
 
-void CPtpClient::SetLocalHost( void )
-{
-    gethostname( ptpHost_, MAX_PROCESSOR_NAME );
-}
-
-int CPtpClient::SendSock(char *buf, int size, int sockFd)
+int CPtpClient::ProcessStdIoData( int nid
+                                , int pid
+                                , StdIoType type
+                                , ssize_t count
+                                , char *data )
 {
-    const char method_name[] = "CPtpClient::SendSock";
+    const char method_name[] = "CPtpClient::ProcessStdIoData";
     TRACE_ENTRY;
-    
-    bool    sendAgain = false;
-    int     error = 0;
-    int     sendCount = 0;
-    int     sent = 0;
-    
-    do
-    {
-        sendCount = (int) send( sockFd
-                              , buf
-                              , size
-                              , 0 );
-        if ( sendCount > 0 ) Meas.addSockPtpSentBytes( sendCount );
 
-        if (trace_settings & (TRACE_REQUEST | TRACE_INIT | TRACE_RECOVERY))
-        {
-            trace_printf( "%s@%d - send(), sendCount=%d\n"
-                        , method_name, __LINE__
-                        , sendCount );
-        }
-    
-        if ( sendCount > 0 )
-        { // Sent data
-            sent += sendCount;
-            if ( sendCount == size )
-            {
-                 sendAgain = false;
-            }
-            else
-            {
-                sendAgain = true;
-            }
-        }
-        else
-        { // Got an error
-            if ( errno != EINTR)
-            {
-                error = errno;
-                sendAgain = false;
-            }
-            else
-            {
-                sendAgain = true;
-            }
-        }
+    if (trace_settings & (TRACE_REDIRECTION | TRACE_PROCESS))
+    {
+        trace_printf( "%s@%d - Sending InternalType_IoData request type =%d "
+                      "to (%d,%d), count=%ld\n"
+                    , method_name, __LINE__
+                    , type
+                    , nid
+                    , pid
+                    , count );
     }
-    while( sendAgain );
 
-    if (trace_settings & (TRACE_REQUEST | TRACE_INIT | TRACE_RECOVERY))
+    CLNode  *lnode = Nodes->GetLNode( nid );
+    if (lnode == NULL)
     {
-        trace_printf( "%s@%d - send(), sent=%d, error=%d(%s)\n"
+        char buf[MON_STRING_BUF_SIZE];
+        snprintf( buf, sizeof(buf)
+                , "[%s], Can't find supplier node nid=%d "
+                  "for stdin data request.\n"
+                , method_name
+                , nid );
+        mon_log_write(PTPCLIENT_STDIODATA_1, SQ_LOG_ERR, buf);
+
+        TRACE_EXIT;
+        return -1;
+    }
+
+    CProcess *process = lnode->GetProcessL( pid );
+    if (process == NULL)
+    {
+        char buf[MON_STRING_BUF_SIZE];
+        snprintf( buf, sizeof(buf)
+                , "[%s], Can't find process nid=%d, "
+                  "pid=%d for stdin data request.\n"
+                , method_name
+                , nid
+                , pid );
+        mon_log_write(PTPCLIENT_STDIODATA_2, SQ_LOG_ERR, buf);
+
+        TRACE_EXIT;
+        return -1;
+    }
+
+    struct internal_msg_def msg;
+    memset(&msg, 0, sizeof(msg)); 
+    msg.type = InternalType_IoData;
+    msg.u.iodata.nid = nid ;
+    msg.u.iodata.pid = pid ;
+    msg.u.iodata.ioType = type ;
+    msg.u.iodata.length = count;
+    memcpy(&msg.u.iodata.data, data, count);
+
+    ptpMsgInfo_t myInfo;
+    myInfo.pnid = MyPNID;
+    myInfo.size = offsetof(struct internal_msg_def, u);
+    myInfo.size += sizeof(msg.u.iodata);
+    
+    if (trace_settings & (TRACE_REDIRECTION | TRACE_PROCESS_DETAIL))
+    {
+        trace_printf( "%s@%d - size_=%d, type =%d "
+                      "to (%d,%d), count=%d\n(%s)"
                     , method_name, __LINE__
-                    , sent
-                    , error, strerror(error) );
+                    , myInfo.size
+                    , msg.u.iodata.ioType
+                    , msg.u.iodata.nid
+                    , msg.u.iodata.pid
+                    , msg.u.iodata.length
+                    , msg.u.iodata.length?msg.u.iodata.data:"\n" );
     }
 
+    int error = SendToMon( "process-stdio-data"
+                         , &msg
+                         , myInfo
+                         , process->GetNid()
+                         , lnode->GetNode()->GetName());
+
     TRACE_EXIT;
     return error;
 }
 
-int CPtpClient::SendToMon(const char *reqType, internal_msg_def *msg, int size, 
-                           int receiveNode, const char *hostName)
+int CPtpClient::SendToMon(const char *reqType, internal_msg_def *msg
+                         , ptpMsgInfo_t &myInfo
+                         , int targetNid, const char *hostName)
 {
     const char method_name[] = "CPtpClient::SendToMon";
     TRACE_ENTRY;
     
-    char monPortString[MAX_PROCESSOR_NAME];
     char ptpHost[MAX_PROCESSOR_NAME];
     char ptpPort[MAX_PROCESSOR_NAME];
-    int tempPort = basePort_;
-    
+    int error = 0;
+    int tempPort = ptpCommPort_;
+    int pnid = 0;
+    int sendSock = -1;
+    int retryCount = 0;
+    CNode *node = NULL;
+    CLNode *lnode = NULL;
+
     ptpHost[0] = '\0';
+    lnode = Nodes->GetLNode( targetNid );
+    node = lnode->GetNode();
+    pnid = node->GetPNid();
 
     // For virtual env
     if (!IsRealCluster)
     {
-        tempPort += receiveNode;
+        tempPort += targetNid;
         strcat( ptpHost, ptpHost_ );
     }
     else
@@ -859,243 +1051,245 @@ int CPtpClient::SendToMon(const char *reqType, internal_msg_def *msg, int size,
     
     if (trace_settings & (TRACE_REQUEST | TRACE_PROCESS))
     {
-        trace_printf( "%s@%d - reqType=%s, hostName=%s, receiveNode=%d, "
-                      "ptpHost=%s, tempPort=%d, basePort_=%d\n"
+        trace_printf( "%s@%d - reqType=%s, hostName=%s, targetNid=%d, "
+                      "ptpHost=%s, tempPort=%d, ptpCommPort_=%d\n"
                     , method_name, __LINE__
                     , reqType
                     , hostName
-                    , receiveNode
+                    , targetNid
                     , ptpHost
                     , tempPort 
-                    , basePort_ );
+                    , ptpCommPort_ );
     }
 
     memset( &ptpPort, 0, MAX_PROCESSOR_NAME );
     memset( &ptpPortBase_, 0, MAX_PROCESSOR_NAME+100 );
+    sprintf( ptpPortBase_,"%s:", ptpHost );
+    sprintf( ptpPort,"%s%d", ptpPortBase_, tempPort );
 
-    strcat( ptpPortBase_, ptpHost );
-    strcat( ptpPortBase_, ":" );
-    sprintf( monPortString,"%d", tempPort );
-    strcat( ptpPort, ptpPortBase_ );
-    strcat( ptpPort, monPortString ); 
+retryIO:
 
-    int error = InitializePtpClient( ptpPort );
-    if (error < 0)
+    if (ptpClusterSocks_[pnid] == -1)
     {
-        TRACE_EXIT;
-        return error;
+        error = InitializePtpClient( pnid, ptpPort );
+        if (error < 0)
+        {
+            ptpClusterSocks_[pnid] = -1;
+            TRACE_EXIT;
+            return error;
+        }
     }
 
+    sendSock = ptpClusterSocks_[pnid];
+
     if (trace_settings & (TRACE_REQUEST | TRACE_PROCESS))
     {
         trace_printf( "%s@%d - sending %s REQ to Monitor=%s, sock=%d\n"
                     , method_name, __LINE__
                     , reqType
                     , ptpPort
-                    , ptpSock_);
+                    , sendSock );
     }
 
-    error = SendSock((char *) &size, sizeof(size), ptpSock_);
+    error = SockSend((char *) &myInfo, sizeof(ptpMsgInfo_t), sendSock);
     if (error)
     {
-        if (trace_settings & (TRACE_REQUEST | TRACE_PROCESS))
+        int err = error;
+        char buf[MON_STRING_BUF_SIZE];
+        snprintf( buf, sizeof(buf)
+                , "[%s], unable to send %s request size %ld to "
+                  "node %s, error: %d(%s)\n"
+                , method_name, reqType, sizeof(ptpMsgInfo_t), ptpHost, err, strerror(err) );
+        mon_log_write(PTPCLIENT_SENDTOMON_1, SQ_LOG_ERR, buf);    
+    }
+    else
+    {
+        error = SockSend((char *) msg, myInfo.size, sendSock);
+        if (error)
         {
-            trace_printf( "%s@%d - error sending to Monitor=%s, sock=%d, error=%d\n"
-                        , method_name, __LINE__
-                        , ptpPort
-                        , ptpSock_
-                        , error );
+            int err = error;
+            char buf[MON_STRING_BUF_SIZE];
+            snprintf( buf, sizeof(buf)
+                    , "[%s], unable to send %s request to "
+                      "node %s, error: %d(%s)\n"
+                    , method_name, reqType, ptpHost, err, strerror(err) );
+            mon_log_write(PTPCLIENT_SENDTOMON_2, SQ_LOG_ERR, buf);    
         }
     }
     
-    error = SendSock((char *) msg, size, ptpSock_);
     if (error)
     {
-        if (trace_settings & (TRACE_REQUEST | TRACE_PROCESS))
+        SockClose( pnid );
+        if ( retryCount < MON2MON_IO_RETRIES )
         {
-            trace_printf( "%s@%d - error sending to nameserver=%s, sock=%d, error=%d\n"
-                        , method_name, __LINE__
-                        , ptpPort
-                        , ptpSock_
-                        , error );
+            retryCount++;
+            if (trace_settings & (TRACE_REQUEST | TRACE_PROCESS))
+            {
+                trace_printf( "%s@%d - retrying IO (%d) to node %s\n"
+                            , method_name, __LINE__
+                            , retryCount
+                            , ptpHost );
+            }
+            goto retryIO;
         }
     }
-    
-    close( ptpSock_ );
 
     TRACE_EXIT;
     return error;
 }
 
-int CPtpClient::StdInReq( int nid
-                        , int pid
-                        , StdinReqType type
-                        , int supplierNid
-                        , int supplierPid )
+void CPtpClient::SockClose( int pnid )
 {
-    const char method_name[] = "CPtpClient::StdInReq";
+    const char method_name[] = "CPtpClient::SockClose";
     TRACE_ENTRY;
 
-    if (trace_settings & (TRACE_REDIRECTION | TRACE_PROCESS))
+    if (ptpClusterSocks_[pnid] != -1)
     {
-        trace_printf( "%s@%d - Sending InternalType_StdinReq request type =%d "
-                      "from (%d,%d), for supplier (%d,%d)\n"
-                    , method_name, __LINE__
-                    , type
-                    , nid
-                    , pid
-                    , supplierNid
-                    , supplierPid );
+        close( ptpClusterSocks_[pnid] );
+        ptpClusterSocks_[pnid] = -1;
     }
 
-    CLNode  *lnode = Nodes->GetLNode( supplierNid );
-    if (lnode == NULL)
-    {
-        char buf[MON_STRING_BUF_SIZE];
-        snprintf( buf, sizeof(buf)
-                , "[%s], Can't find supplier node nid=%d "
-                  "for stdin data request.\n"
-                , method_name
-                , supplierNid );
-        mon_log_write(PTPCLIENT_STDINREQ_1, SQ_LOG_ERR, buf);
+    TRACE_EXIT;
+}
 
-        TRACE_EXIT;
-        return -1;
-    }
+void CPtpClient::SetLocalHost( void )
+{
+    gethostname( ptpHost_, MAX_PROCESSOR_NAME );
+}
 
-    CProcess *process = lnode->GetProcessL( supplierPid );
-    if (process == NULL)
+int CPtpClient::SockReceive(char *buf, int size, int sockFd)
+{
+    const char method_name[] = "CPtpClient::SockReceive";
+    TRACE_ENTRY;
+
+    bool    readAgain = false;
+    int     error = 0;
+    int     readCount = 0;
+    int     received = 0;
+    int     sizeCount = size;
+       
+    do
     {
-        char buf[MON_STRING_BUF_SIZE];
-        snprintf( buf, sizeof(buf)
-                , "[%s], Can't find process nid=%d, "
-                  "pid=%d for stdin data request.\n"
-                , method_name
-                , supplierNid
-                , supplierPid );
-        mon_log_write(PTPCLIENT_STDINREQ_2, SQ_LOG_ERR, buf);
+        readCount = (int) recv( sockFd
+                              , buf
+                              , sizeCount
+                              , 0 );
+        if ( readCount > 0 ) Meas.addSockPtpRcvdBytes( readCount );
 
-        TRACE_EXIT;
-        return -1;
+        if (trace_settings & (TRACE_REQUEST | TRACE_INIT | TRACE_RECOVERY))
+        {
+            trace_printf( "%s@%d - Count read %d = recv(%d)\n"
+                        , method_name, __LINE__
+                        , readCount
+                        , sizeCount );
+        }
+    
+        if ( readCount > 0 )
+        { // Got data
+            received += readCount;
+            buf += readCount;
+            if ( received == size )
+            {
+                readAgain = false;
+            }
+            else
+            {
+                sizeCount -= readCount;
+                readAgain = true;
+            }
+        }
+        else if ( readCount == 0 )
+        { // EOF
+             error = ENODATA;
+             readAgain = false;
+        }
+        else
+        { // Got an error
+            if ( errno != EINTR)
+            {
+                error = errno;
+                readAgain = false;
+            }
+            else
+            {
+                readAgain = true;
+            }
+        }
     }
+    while( readAgain );
 
-    struct internal_msg_def msg;
-    memset(&msg, 0, sizeof(msg)); 
-    msg.type = InternalType_StdinReq;
-    msg.u.stdin_req.nid = nid;
-    msg.u.stdin_req.pid = pid;
-    msg.u.stdin_req.reqType = type;
-    msg.u.stdin_req.supplier_nid = supplierNid;
-    msg.u.stdin_req.supplier_pid = supplierPid;
-
-    int size = offsetof(struct internal_msg_def, u);
-    size += sizeof(msg.u.stdin_req);
-    
-    if (trace_settings & (TRACE_REDIRECTION | TRACE_PROCESS_DETAIL))
+    if (trace_settings & (TRACE_REQUEST | TRACE_INIT | TRACE_RECOVERY))
     {
-        trace_printf( "%s@%d - size_=%d, type =%d "
-                      "from (%d,%d), for supplier (%d,%d)\n"
+        trace_printf( "%s@%d - recv(), received=%d, error=%d(%s)\n"
                     , method_name, __LINE__
-                    , size
-                    , msg.u.stdin_req.reqType
-                    , msg.u.stdin_req.nid
-                    , msg.u.stdin_req.pid
-                    , msg.u.stdin_req.supplier_nid
-                    , msg.u.stdin_req.supplier_pid );
+                    , received
+                    , error, strerror(error) );
     }
 
-    int error = SendToMon("stdin"
-                         , &msg
-                         , size
-                         , process->GetNid()
-                         , lnode->GetNode()->GetName());
-    
     TRACE_EXIT;
     return error;
 }
 
-int CPtpClient::StdIoData( int nid
-                         , int pid
-                         , StdIoType type
-                         , ssize_t count
-                         , char *data )
+int CPtpClient::SockSend(char *buf, int size, int sockFd)
 {
-    const char method_name[] = "CPtpClient::StdIoData";
+    const char method_name[] = "CPtpClient::SockSend";
     TRACE_ENTRY;
-
-    if (trace_settings & (TRACE_REDIRECTION | TRACE_PROCESS))
-    {
-        trace_printf( "%s@%d - Sending InternalType_IoData request type =%d "
-                      "to (%d,%d), count=%ld\n"
-                    , method_name, __LINE__
-                    , type
-                    , nid
-                    , pid
-                    , count );
-    }
-
-    CLNode  *lnode = Nodes->GetLNode( nid );
-    if (lnode == NULL)
-    {
-        char buf[MON_STRING_BUF_SIZE];
-        snprintf( buf, sizeof(buf)
-                , "[%s], Can't find supplier node nid=%d "
-                  "for stdin data request.\n"
-                , method_name
-                , nid );
-        mon_log_write(PTPCLIENT_STDIODATA_1, SQ_LOG_ERR, buf);
-
-        TRACE_EXIT;
-        return -1;
-    }
-
-    CProcess *process = lnode->GetProcessL( pid );
-    if (process == NULL)
+    
+    bool    sendAgain = false;
+    int     error = 0;
+    int     sendCount = 0;
+    int     sent = 0;
+    
+    do
     {
-        char buf[MON_STRING_BUF_SIZE];
-        snprintf( buf, sizeof(buf)
-                , "[%s], Can't find process nid=%d, "
-                  "pid=%d for stdin data request.\n"
-                , method_name
-                , nid
-                , pid );
-        mon_log_write(PTPCLIENT_STDIODATA_2, SQ_LOG_ERR, buf);
+        sendCount = (int) send( sockFd
+                              , buf
+                              , size
+                              , 0 );
+        if ( sendCount > 0 ) Meas.addSockPtpSentBytes( sendCount );
 
-        TRACE_EXIT;
-        return -1;
+        if (trace_settings & (TRACE_REQUEST | TRACE_INIT | TRACE_RECOVERY))
+        {
+            trace_printf( "%s@%d - send(), sendCount=%d\n"
+                        , method_name, __LINE__
+                        , sendCount );
+        }
+    
+        if ( sendCount > 0 )
+        { // Sent data
+            sent += sendCount;
+            if ( sendCount == size )
+            {
+                 sendAgain = false;
+            }
+            else
+            {
+                sendAgain = true;
+            }
+        }
+        else
+        { // Got an error
+            if ( errno != EINTR)
+            {
+                error = errno;
+                sendAgain = false;
+            }
+            else
+            {
+                sendAgain = true;
+            }
+        }
     }
+    while( sendAgain );
 
-    struct internal_msg_def msg;
-    memset(&msg, 0, sizeof(msg)); 
-    msg.type = InternalType_IoData;
-    msg.u.iodata.nid = nid ;
-    msg.u.iodata.pid = pid ;
-    msg.u.iodata.ioType = type ;
-    msg.u.iodata.length = count;
-    memcpy(&msg.u.iodata.data, data, count);
-
-    int size = offsetof(struct internal_msg_def, u);
-    size += sizeof(msg.u.iodata);
-    
-    if (trace_settings & (TRACE_REDIRECTION | TRACE_PROCESS_DETAIL))
+    if (trace_settings & (TRACE_REQUEST | TRACE_INIT | TRACE_RECOVERY))
     {
-        trace_printf( "%s@%d - size_=%d, type =%d "
-                      "to (%d,%d), count=%d\n(%s)"
+        trace_printf( "%s@%d - send(), sent=%d, error=%d(%s)\n"
                     , method_name, __LINE__
-                    , size
-                    , msg.u.iodata.ioType
-                    , msg.u.iodata.nid
-                    , msg.u.iodata.pid
-                    , msg.u.iodata.length
-                    , msg.u.iodata.length?msg.u.iodata.data:"\n" );
+                    , sent
+                    , error, strerror(error) );
     }
 
-    int error = SendToMon("stdio-data"
-                         , &msg
-                         , size
-                         , process->GetNid()
-                         , lnode->GetNode()->GetName());
-    
     TRACE_EXIT;
     return error;
 }

http://git-wip-us.apache.org/repos/asf/trafodion/blob/32fe8565/core/sqf/monitor/linux/ptpclient.h
----------------------------------------------------------------------
diff --git a/core/sqf/monitor/linux/ptpclient.h b/core/sqf/monitor/linux/ptpclient.h
index e6ddeb4..5239c78 100644
--- a/core/sqf/monitor/linux/ptpclient.h
+++ b/core/sqf/monitor/linux/ptpclient.h
@@ -40,58 +40,66 @@ public:
     CPtpClient( void );
     virtual ~CPtpClient( void );
 
-    int  AddUniqStr( int nid
-                   , int id
-                   , const char *stringValue
-                   , int targetNid
-                   , const char *targetNodeName );
-    int  InitializePtpClient( char * ptpPort );
-    int  ProcessClone( CProcess *process );
+    int  InitializePtpClient( int pnid, char* ptpPort );
+    int  ProcessAddUniqStr( int nid
+                          , int id
+                          , const char* stringValue
+                          , int targetNid
+                          , const char* targetNodeName );
+    int  ProcessClone( CProcess* process );
     int  ProcessExit( CProcess* process
                     , int parentNid
-                    , const char *targetNodeName );
-    int  ProcessInit( CProcess *process
-                    , void *tag
+                    , const char* targetNodeName );
+    int  ProcessInit( CProcess* process
+                    , void* tag
                     , int result
                     , int parentNid );
     int  ProcessKill( CProcess* process
                     , bool abort
                     , int targetNid
-                    , const char *targetNodeName );
+                    , const char* targetNodeName );
     int  ProcessNew( CProcess* process
                    , int targetNid
-                   , const char *targetNodeName );
+                   , const char* targetNodeName );
     int  ProcessNotify( int nid
                       , int pid
                       , Verifier_t verifier
                       , _TM_Txid_External transId
                       , bool canceled
-                      , CProcess *targetProcess
+                      , CProcess* targetProcess
                       , int targetNid
-                      , const char *targetNodeName );
-    int  StdInReq( int nid
-                 , int pid
-                 , StdinReqType type
-                 , int supplierNid
-                 , int supplierPid );
-    int  StdIoData( int nid
-                  , int pid
-                  , StdIoType type
-                  , ssize_t count
-                  , char *data );
+                      , const char* targetNodeName );
+    int  ProcessStdInReq( int nid
+                        , int pid
+                        , StdinReqType type
+                        , int supplierNid
+                        , int supplierPid );
+    int  ProcessStdIoData( int nid
+                         , int pid
+                         , StdIoType type
+                         , ssize_t count
+                         , char* data );
 
 private:
 
-    int  basePort_;
+    int  ptpCommPort_;
     char ptpHost_[MAX_PROCESSOR_NAME];
     char ptpPortBase_[MAX_PROCESSOR_NAME+100];
-    int  ptpSock_;
+    int *ptpClusterSocks_;
     int  seqNum_;
 
-    int  ReceiveSock(char *buf, int size, int sockFd);
-    int  SendSock(char *buf, int size, int sockFd);
-    int  SendToMon(const char *reqType, internal_msg_def *msg, int size, int receiveNode, const char *hostName);
+    bool IsTargetRemote( int targetNid );
+    int  SendToMon( const char* reqType
+                   , internal_msg_def* msg
+                   , ptpMsgInfo_t &myInfo
+                   , int receiveNode
+                   , const char* hostName);
     void SetLocalHost( void );
+    void SockClose( int pnid );
+    int  SockReceive(char* buf, int size, int sockFd);
+    int  SockSend( char* buf
+                 , int size
+                 , int sockFd);
 };
 
 #endif

http://git-wip-us.apache.org/repos/asf/trafodion/blob/32fe8565/core/sqf/monitor/linux/ptpcommaccept.cxx
----------------------------------------------------------------------
diff --git a/core/sqf/monitor/linux/ptpcommaccept.cxx b/core/sqf/monitor/linux/ptpcommaccept.cxx
index d380d3a..15933dd 100644
--- a/core/sqf/monitor/linux/ptpcommaccept.cxx
+++ b/core/sqf/monitor/linux/ptpcommaccept.cxx
@@ -47,6 +47,7 @@ extern char *ErrorMsg (int error_code);
 extern const char *StateString( STATE state);
 extern CommType_t CommType;
 
+static void *ptpProcess( void *arg );
 
 CPtpCommAccept::CPtpCommAccept()
                : accepting_(true)
@@ -71,156 +72,206 @@ void CPtpCommAccept::processNewSock( int sockFd )
 {
     const char method_name[] = "CPtpCommAccept::processNewSock";
     TRACE_ENTRY;
-    
-    struct internal_msg_def msg;
+
     int rc;
-    
-    mem_log_write(CMonLog::MON_CONNTONEWMON_2);
-    int size;
-    rc = Monitor->ReceiveSock( (char *) &size, sizeof(size), sockFd, method_name );
 
-    if ( rc )
-    {   // Handle error
-        close( sockFd );
-        char buf[MON_STRING_BUF_SIZE];
-        snprintf(buf, sizeof(buf), "[%s], unable to obtain node id from new "
-                     "monitor: %s.\n", method_name, ErrorMsg(rc));
-        mon_log_write(PTP_COMMACCEPT_1, SQ_LOG_ERR, buf);    
-        return;
-    }
-    // Get info about connecting monitor
-    rc = Monitor->ReceiveSock( (char *) &msg
-                             , size
-                             , sockFd
-                             , method_name );
-                        
-    if ( rc )
-    {   // Handle error
-        close( sockFd );
+    mem_log_write(CMonLog::MON_CONNTONEWMON_1);
+
+    // need to create context in case back-to-back accept is too fast
+    Context *ctx = new Context();
+    ctx->this_ = this;
+    ctx->pendingFd_ = sockFd;
+    rc = pthread_create(&process_thread_id_, NULL, ptpProcess, ctx);
+    if (rc != 0)
+    {
         char buf[MON_STRING_BUF_SIZE];
-        snprintf(buf, sizeof(buf), "[%s], unable to obtain node id from new "
-                 "monitor: %s.\n", method_name, ErrorMsg(rc));
-        mon_log_write(PTP_COMMACCEPT_2, SQ_LOG_ERR, buf);    
-        return;
+        snprintf(buf, sizeof(buf), "[%s], ptpProcess thread create error=%d\n",
+                 method_name, rc);
+        mon_log_write(PTP_COMMACCEPT_1, SQ_LOG_ERR, buf);
     }
-    else
+
+    TRACE_EXIT;
+}
+
+void CPtpCommAccept::processMonReqs( int sockFd )
+{
+    const char method_name[] = "CPtpCommAccept::processMonReqs";
+    TRACE_ENTRY;
+
+    int rc;
+    struct internal_msg_def msg;
+
+    while ( true )
     {
-        switch ( msg.type )
+        mem_log_write(CMonLog::MON_CONNTONEWMON_2);
+        ptpMsgInfo_t remoteInfo;
+    
+        // Get info about connecting monitor
+        rc = Monitor->ReceiveSock( (char *) &remoteInfo
+                                 , sizeof(ptpMsgInfo_t)
+                                 , sockFd
+                                 , method_name );
+        if ( rc )
+        {   // Handle error
+            char buf[MON_STRING_BUF_SIZE];
+            snprintf(buf, sizeof(buf), "[%s], unable to obtain message size and pnid "
+                         "from remote monitor: %s.\n", method_name, ErrorMsg(rc));
+            mon_log_write(PTP_COMMACCEPT_2, SQ_LOG_ERR, buf);    
+            return;
+        }
+    
+        // Get info about connecting monitor
+        rc = Monitor->ReceiveSock( (char *) &msg
+                                 , remoteInfo.size
+                                 , sockFd
+                                 , method_name );
+        if ( rc )
+        {   // Handle error
+            char buf[MON_STRING_BUF_SIZE];
+            CNode *node = Nodes->GetNode(remoteInfo.pnid);
+            snprintf( buf, sizeof(buf)
+                    , "[%s], unable to obtain message size (%d) from remote "
+                      "monitor %d(%s), error: %s.\n"
+                    , method_name
+                    , remoteInfo.size
+                    , remoteInfo.pnid
+                    , node ? node->GetName() : ""
+                    , ErrorMsg(rc));
+            mon_log_write(PTP_COMMACCEPT_3, SQ_LOG_ERR, buf);    
+            return;
+        }
+        else
         {
-            case InternalType_UniqStr:
+            switch ( msg.type )
             {
-                if (trace_settings & (TRACE_REQUEST | TRACE_PROCESS))
+                case InternalType_UniqStr:
                 {
-                    trace_printf( "%s@%d" " - Received InternalType_UniqStr\n"
-                                , method_name, __LINE__ );
+                    if (trace_settings & (TRACE_REQUEST | TRACE_PROCESS))
+                    {
+                        trace_printf( "%s@%d" " - Received InternalType_UniqStr\n"
+                                    , method_name, __LINE__ );
+                    }
+                    ReqQueue.enqueueUniqStrReq( &msg.u.uniqstr);
+                    break;
                 }
-                ReqQueue.enqueueUniqStrReq( &msg.u.uniqstr);
-                break;
-            }
-            case InternalType_Process:
-            {
-                if (trace_settings & (TRACE_REQUEST | TRACE_PROCESS))
+                case InternalType_Process:
                 {
-                    trace_printf( "%s@%d" " - Received InternalType_Process\n"
-                                , method_name, __LINE__ );
+                    if (trace_settings & (TRACE_REQUEST | TRACE_PROCESS))
+                    {
+                        trace_printf( "%s@%d" " - Received InternalType_Process\n"
+                                    , method_name, __LINE__ );
+                    }
+                    ReqQueue.enqueueNewProcReq( &msg.u.process);
+                    break;
                 }
-                ReqQueue.enqueueNewProcReq( &msg.u.process);
-                break;
-            }
-            case InternalType_ProcessInit:
-            {
-                if (trace_settings & (TRACE_REQUEST | TRACE_PROCESS))
+                case InternalType_ProcessInit:
                 {
-                    trace_printf( "%s@%d" " - Received InternalType_ProcessInit\n"
-                                , method_name, __LINE__ );
-                }
-                if ( MyNode->IsMyNode(msg.u.processInit.origNid) )
-                {  // New process request originated on this node
-                    ReqQueue.enqueueProcInitReq( &msg.u.processInit);
+                    if (trace_settings & (TRACE_REQUEST | TRACE_PROCESS))
+                    {
+                        trace_printf( "%s@%d" " - Received InternalType_ProcessInit\n"
+                                    , method_name, __LINE__ );
+                    }
+                    if ( MyNode->IsMyNode(msg.u.processInit.origNid) )
+                    {  // New process request originated on this node
+                        ReqQueue.enqueueProcInitReq( &msg.u.processInit);
+                    }
+                    else
+                    {
+                        abort();
+                    }
+                    break;
                 }
-                else
+                case InternalType_Clone:
                 {
-                    abort();
+                    if (trace_settings & (TRACE_REQUEST | TRACE_PROCESS))
+                    {
+                        trace_printf( "%s@%d" " - Received InternalType_Clone\n"
+                                    , method_name, __LINE__ );
+                    }
+                    ReqQueue.enqueueCloneReq( &msg.u.clone );
+                    break;
                 }
-                break;
-            }
-            case InternalType_Clone:
-            {
-                if (trace_settings & (TRACE_REQUEST | TRACE_PROCESS))
+                case InternalType_Open:
                 {
-                    trace_printf( "%s@%d" " - Received InternalType_Clone\n"
-                                , method_name, __LINE__ );
+                    if (trace_settings & (TRACE_REQUEST | TRACE_PROCESS))
+                    {
+                        trace_printf( "%s@%d" " - Received InternalType_Open\n"
+                                    , method_name, __LINE__ );
+                    }
+                    ReqQueue.enqueueOpenReq( &msg.u.open );
+                    break;
                 }
-                ReqQueue.enqueueCloneReq( &msg.u.clone );
-                break;
-            }
-            case InternalType_Open:
-            {
-                if (trace_settings & (TRACE_REQUEST | TRACE_PROCESS))
+                case InternalType_Notify:
                 {
-                    trace_printf( "%s@%d" " - Received InternalType_Open\n"
-                                , method_name, __LINE__ );
+                    if (trace_settings & (TRACE_REQUEST | TRACE_PROCESS))
+                    {
+                        trace_printf( "%s@%d" " - Received InternalType_Notify\n"
+                                    , method_name, __LINE__ );
+                    }
+                    ReqQueue.enqueueNotifyReq( &msg.u.notify );
+                    break;
                 }
-                ReqQueue.enqueueOpenReq( &msg.u.open );
-                break;
-            }
-            case InternalType_Notify:
-            {
-                if (trace_settings & (TRACE_REQUEST | TRACE_PROCESS))
+                case InternalType_Exit:
                 {
-                    trace_printf( "%s@%d" " - Received InternalType_Notify\n"
-                                , method_name, __LINE__ );
+                    if (trace_settings & (TRACE_REQUEST | TRACE_PROCESS))
+                    {
+                        trace_printf( "%s@%d" " - Received InternalType_Exit\n"
+                                    , method_name, __LINE__ );
+                    }
+                    ReqQueue.enqueueExitReq( &msg.u.exit );
+                    break;
                 }
-                ReqQueue.enqueueNotifyReq( &msg.u.notify );
-                break;
-            }
-            case InternalType_Exit:
-            {
-                if (trace_settings & (TRACE_REQUEST | TRACE_PROCESS))
+                case InternalType_Kill:
                 {
-                    trace_printf( "%s@%d" " - Received InternalType_Exit\n"
-                                , method_name, __LINE__ );
+                    if (trace_settings & (TRACE_REQUEST | TRACE_PROCESS))
+                    {
+                        trace_printf( "%s@%d" " - Received InternalType_Kill\n"
+                                    , method_name, __LINE__ );
+                    }
+                    ReqQueue.enqueueKillReq( &msg.u.kill );
+                    break;
                 }
-                ReqQueue.enqueueExitReq( &msg.u.exit );
-                break;
-            }
-            case InternalType_Kill:
-            {
-                if (trace_settings & (TRACE_REQUEST | TRACE_PROCESS))
+                case InternalType_IoData:
                 {
-                    trace_printf( "%s@%d" " - Received InternalType_Kill\n"
-                                , method_name, __LINE__ );
+                    if (trace_settings & (TRACE_REDIRECTION | TRACE_PROCESS))
+                    {
+                        trace_printf( "%s@%d" " - Received InternalType_IoData\n"
+                                    , method_name, __LINE__ );
+                    }
+                    ReqQueue.enqueueIoDataReq( &msg.u.iodata );
+                    break;
                 }
-                ReqQueue.enqueueKillReq( &msg.u.kill );
-                break;
-            }
-            case InternalType_IoData:
-            {
-                if (trace_settings & (TRACE_REDIRECTION | TRACE_PROCESS))
+                case InternalType_StdinReq:
                 {
-                    trace_printf( "%s@%d" " - Received InternalType_IoData\n"
-                                , method_name, __LINE__ );
+                    if (trace_settings & (TRACE_REDIRECTION | TRACE_PROCESS))
+                    {
+                        trace_printf( "%s@%d" " - Received InternalType_StdinReq\n"
+                                    , method_name, __LINE__ );
+                    }
+                    ReqQueue.enqueueStdInReq( &msg.u.stdin_req );
+                    break;
                 }
-                ReqQueue.enqueueIoDataReq( &msg.u.iodata );
-                break;
-            }
-            case InternalType_StdinReq:
-            {
-                if (trace_settings & (TRACE_REDIRECTION | TRACE_PROCESS))
+                default:
                 {
-                    trace_printf( "%s@%d" " - Received InternalType_StdinReq\n"
-                                , method_name, __LINE__ );
+                    char buf[MON_STRING_BUF_SIZE];
+                    CNode *node = Nodes->GetNode(remoteInfo.pnid);
+                    snprintf( buf, sizeof(buf)
+                            , "[%s], Invalid msg.type: %d, msg size=%d, "
+                              "remote monitor %d(%s)\n"
+                            , method_name
+                            , msg.type
+                            , remoteInfo.size
+                            , remoteInfo.pnid
+                            , node ? node->GetName() : "" );
+                    mon_log_write(PTP_COMMACCEPT_4, SQ_LOG_ERR, buf);    
+                    abort();
                 }
-                ReqQueue.enqueueStdInReq( &msg.u.stdin_req );
-                break;
-            }
-            default:
-            {
-                abort();
             }
         }
     }
 
+    close( sockFd );
+
     TRACE_EXIT;
 }
 
@@ -285,7 +336,7 @@ void CPtpCommAccept::commAcceptorSock()
                 continue; // Ok to accept another connection
             }
         }
-        
+
         if (shutdown_)
         {   // We are being notified to exit.
             break;
@@ -296,12 +347,12 @@ void CPtpCommAccept::commAcceptorSock()
             char buf[MON_STRING_BUF_SIZE];
             snprintf(buf, sizeof(buf), "[%s], cannot accept new monitor: %s.\n",
                      method_name, strerror(errno));
-            mon_log_write(PTP_COMMACCEPT_6, SQ_LOG_ERR, buf);
+            mon_log_write(PTP_COMMACCEPT_5, SQ_LOG_ERR, buf);
         }
         else
         {
             processNewSock( sockFd );
-            close( sockFd );
+            //close( sockFd );
         }
     }
 
@@ -334,7 +385,7 @@ void CPtpCommAccept::shutdownWork(void)
     TRACE_EXIT;
 }
 
-// Initialize PtpCommAcceptor thread
+// Initialize ptpCommAcceptor thread
 static void *ptpCommAccept(void *arg)
 {
     const char method_name[] = "ptpCommAccept";
@@ -353,7 +404,7 @@ static void *ptpCommAccept(void *arg)
         char buf[MON_STRING_BUF_SIZE];
         snprintf(buf, sizeof(buf), "[%s], pthread_sigmask error=%d\n",
                  method_name, rc);
-        mon_log_write(PTP_COMMACCEPT_7, SQ_LOG_ERR, buf);
+        mon_log_write(PTP_COMMACCEPT_6, SQ_LOG_ERR, buf);
     }
 
     // Enter thread processing loop
@@ -364,7 +415,38 @@ static void *ptpCommAccept(void *arg)
 }
 
 
-// Create a commAcceptor thread
+// Initialize ptpProcess thread
+static void *ptpProcess(void *arg)
+{
+    const char method_name[] = "ptpProcess";
+    TRACE_ENTRY;
+
+    // Parameter passed to the thread is an context
+    CPtpCommAccept::Context *ctx = (CPtpCommAccept::Context *) arg;
+    CPtpCommAccept *cao = ctx->this_;
+
+    // Mask all allowed signals
+    sigset_t  mask;
+    sigfillset(&mask);
+    sigdelset(&mask, SIGPROF); // allows profiling such as google profiler
+    int rc = pthread_sigmask(SIG_SETMASK, &mask, NULL);
+    if (rc != 0)
+    {
+        char buf[MON_STRING_BUF_SIZE];
+        snprintf(buf, sizeof(buf), "[%s], pthread_sigmask error=%d\n",
+                 method_name, rc);
+        mon_log_write(PTP_COMMACCEPT_7, SQ_LOG_ERR, buf);
+    }
+
+    // Enter thread processing loop
+    cao->processMonReqs(ctx->pendingFd_);
+    delete ctx;
+
+    TRACE_EXIT;
+    return NULL;
+}
+
+// Create a ptpCommAccept thread
 void CPtpCommAccept::start()
 {
     const char method_name[] = "CPtpCommAccept::start";

http://git-wip-us.apache.org/repos/asf/trafodion/blob/32fe8565/core/sqf/monitor/linux/ptpcommaccept.h
----------------------------------------------------------------------
diff --git a/core/sqf/monitor/linux/ptpcommaccept.h b/core/sqf/monitor/linux/ptpcommaccept.h
index ca58139..78e9fe0 100644
--- a/core/sqf/monitor/linux/ptpcommaccept.h
+++ b/core/sqf/monitor/linux/ptpcommaccept.h
@@ -41,12 +41,19 @@ public:
     bool isAccepting( void ) { CAutoLock lock(getLocker()); return( accepting_ ); }
     void monReqExec( void *req ); //stupid compiler and circular header files
 
+    void processMonReqs( int sockFd );
     void processNewSock( int sockFd );
     void startAccepting( void );
     void stopAccepting( void );
     void start( void );
     void shutdownWork( void );
 
+    typedef struct
+    {
+        CPtpCommAccept *this_;
+        int             pendingFd_;
+    } Context;
+
 private:
 
     void commAcceptorSock( void );
@@ -54,9 +61,10 @@ private:
     bool accepting_;
     bool shutdown_;
 
-    // commAccept thread's id
+    // ptpCommAccept thread's id
     pthread_t                      thread_id_;
-
+    // ptpProcess thread's id
+    pthread_t                      process_thread_id_;
 };
 
 #endif

http://git-wip-us.apache.org/repos/asf/trafodion/blob/32fe8565/core/sqf/monitor/linux/redirector.cxx
----------------------------------------------------------------------
diff --git a/core/sqf/monitor/linux/redirector.cxx b/core/sqf/monitor/linux/redirector.cxx
index 43bb231..70e8f9c 100644
--- a/core/sqf/monitor/linux/redirector.cxx
+++ b/core/sqf/monitor/linux/redirector.cxx
@@ -564,7 +564,10 @@ CRedirectAncestorStdin::~CRedirectAncestorStdin()
     TRACE_ENTRY;
 
     // Delete pending buffer (if any)
-    delete buffer_;
+    if (buffer_)
+    {
+        delete [] buffer_;
+    }
 
     // Delete queued data (if any)
     while (!ioDataList_.empty())
@@ -572,7 +575,7 @@ CRedirectAncestorStdin::~CRedirectAncestorStdin()
         // Get first data buffer from list
         buffer_ = ioDataList_.front();
         ioDataList_.pop_front();
-        delete buffer_;
+        delete [] buffer_;
     }
 
     // Alter eyecatcher sequence as a debugging aid to identify deleted object
@@ -646,7 +649,7 @@ int CRedirectAncestorStdin::handleInput()
             retVal = -1;
 
             bufferPos_ = 0;
-            delete buffer_;
+            delete [] buffer_;
             buffer_ = NULL;
 
             reqType = STDIN_FLOW_ON;
@@ -659,7 +662,7 @@ int CRedirectAncestorStdin::handleInput()
         else
         {   // Have written all data, will need to get more.
             bufferPos_ = 0;
-            delete buffer_;
+            delete [] buffer_;
             buffer_ = NULL;
 
             reqType = STDIN_FLOW_ON;
@@ -667,11 +670,11 @@ int CRedirectAncestorStdin::handleInput()
 
         if (NameServerEnabled)
         {
-            PtpClient->StdInReq( MyPNID
-                               , pid_
-                               , reqType
-                               , ancestorNid_
-                               , ancestorPid_  );
+            PtpClient->ProcessStdInReq( MyPNID
+                                      , pid_
+                                      , reqType
+                                      , ancestorNid_
+                                      , ancestorPid_  );
         }
         else
         {
@@ -792,7 +795,7 @@ CRedirectStdinRemote::CRedirectStdinRemote(const char *filename,
                 char buf[MON_STRING_BUF_SIZE];
                 sprintf(buf, "[%s], %s is an unsupported file type.\n",
                         method_name, filename);
-                mon_log_write(MON_REDIR_STDINREMOTE_2, SQ_LOG_ERR, buf);
+                mon_log_write(MON_REDIR_STDINREMOTE_2, SQ_LOG_INFO, buf);
 
                 close(fd_);
                 fd_ = -1;
@@ -874,11 +877,11 @@ void CRedirectStdinRemote::handleOutput(ssize_t count, char *buffer)
 
     if (NameServerEnabled)
     {
-        PtpClient->StdIoData( requesterNid_
-                            , pid_
-                            , STDIN_DATA
-                            , count
-                            , buffer );
+        PtpClient->ProcessStdIoData( requesterNid_
+                                   , pid_
+                                   , STDIN_DATA
+                                   , count
+                                   , buffer );
     }
     else
     {
@@ -1177,11 +1180,11 @@ void CRedirectAncestorStdout::handleOutput(ssize_t count, char *buffer)
 
     if (NameServerEnabled)
     {
-        PtpClient->StdIoData( ancestor_nid_
-                            , ancestor_pid_
-                            , STDOUT_DATA
-                            , count
-                            , buffer );
+        PtpClient->ProcessStdIoData( ancestor_nid_
+                                   , ancestor_pid_
+                                   , STDOUT_DATA
+                                   , count
+                                   , buffer );
     }
     else
     {
@@ -1654,11 +1657,11 @@ void CRedirector::stdinFd(int nid, int pid, int &pipeFd, char filename[],
 
         if (NameServerEnabled)
         {
-            PtpClient->StdInReq( nid
-                               , pid
-                               , STDIN_REQ_DATA
-                               , ancestor_nid
-                               , ancestor_pid );
+            PtpClient->ProcessStdInReq( nid
+                                      , pid
+                                      , STDIN_REQ_DATA
+                                      , ancestor_nid
+                                      , ancestor_pid );
         }
         else
         {

http://git-wip-us.apache.org/repos/asf/trafodion/blob/32fe8565/core/sqf/monitor/linux/reqdump.cxx
----------------------------------------------------------------------
diff --git a/core/sqf/monitor/linux/reqdump.cxx b/core/sqf/monitor/linux/reqdump.cxx
index 5d2dd5e..fda3cea 100644
--- a/core/sqf/monitor/linux/reqdump.cxx
+++ b/core/sqf/monitor/linux/reqdump.cxx
@@ -129,8 +129,11 @@ void CExtDumpReq::performRequest()
     {
         if ( target_process_name.size() )
         { // find by name
-            targetProcess = Nodes->GetProcess( target_process_name.c_str()
-                                             , target_verifier );
+            if (msg_->u.request.u.dump.target_process_name[0] == '$' )
+            {
+                targetProcess = Nodes->GetProcess( target_process_name.c_str()
+                                                 , target_verifier );
+            }
         }
         else
         { // find by nid, pid
@@ -152,9 +155,12 @@ void CExtDumpReq::performRequest()
                                     , target_process_name.c_str()
                                     , target_verifier );
                     }
-                    cloneProcess = Nodes->CloneProcessNs( target_process_name.c_str()
-                                                        , target_verifier );
-                    targetProcess = cloneProcess;
+                    if (msg_->u.request.u.dump.target_process_name[0] == '$' )
+                    {
+                        cloneProcess = Nodes->CloneProcessNs( target_process_name.c_str()
+                                                            , target_verifier );
+                        targetProcess = cloneProcess;
+                    }
                 }     
                 else
                 { // Name Server find by nid,pid:verifier

http://git-wip-us.apache.org/repos/asf/trafodion/blob/32fe8565/core/sqf/monitor/linux/reqevent.cxx
----------------------------------------------------------------------
diff --git a/core/sqf/monitor/linux/reqevent.cxx b/core/sqf/monitor/linux/reqevent.cxx
index 01c9067..f86582d 100644
--- a/core/sqf/monitor/linux/reqevent.cxx
+++ b/core/sqf/monitor/linux/reqevent.cxx
@@ -163,8 +163,11 @@ void CExtEventReq::performRequest()
 
                 if ( target_process_name.size() )
                 { // find by name
-                    targetProcess = Nodes->GetProcess( target_process_name.c_str()
-                                                     , target_verifier );
+                    if (msg_->u.request.u.event.target_process_name[0] == '$' )
+                    {
+                        targetProcess = Nodes->GetProcess( target_process_name.c_str()
+                                                         , target_verifier );
+                    }
                     if ( !targetProcess )
                     {
                         if (NameServerEnabled)
@@ -176,9 +179,12 @@ void CExtEventReq::performRequest()
                                             , target_process_name.c_str()
                                             , target_verifier );
                             }
-                            cloneProcess = Nodes->CloneProcessNs( target_process_name.c_str()
-                                                                , target_verifier );
-                            targetProcess = cloneProcess;
+                            if (msg_->u.request.u.event.target_process_name[0] == '$' )
+                            {
+                                cloneProcess = Nodes->CloneProcessNs( target_process_name.c_str()
+                                                                    , target_verifier );
+                                targetProcess = cloneProcess;
+                            }
                         }     
                     }
                     if ( targetProcess && trace_settings & (TRACE_REQUEST | TRACE_PROCESS))

http://git-wip-us.apache.org/repos/asf/trafodion/blob/32fe8565/core/sqf/monitor/linux/reqkill.cxx
----------------------------------------------------------------------
diff --git a/core/sqf/monitor/linux/reqkill.cxx b/core/sqf/monitor/linux/reqkill.cxx
index b59cae2..a7f7b62 100644
--- a/core/sqf/monitor/linux/reqkill.cxx
+++ b/core/sqf/monitor/linux/reqkill.cxx
@@ -211,14 +211,17 @@ void CExtKillReq::performRequest()
     {
         if ( target_process_name.size() )
         { // find by name (check node state, don't check process state, not backup)
-            targetProcess = Nodes->GetProcess( target_process_name.c_str()
-                                              , target_verifier
-                                              , true, false, false );
-            if ( targetProcess &&
-                (msg_->u.request.u.kill.target_nid == -1 ||
-                 msg_->u.request.u.kill.target_pid == -1))
+            if (msg_->u.request.u.kill.target_process_name[0] == '$' )
             {
-                backup = targetProcess->GetBackup ();
+                targetProcess = Nodes->GetProcess( target_process_name.c_str()
+                                                  , target_verifier
+                                                  , true, false, false );
+                if ( targetProcess &&
+                    (msg_->u.request.u.kill.target_nid == -1 ||
+                     msg_->u.request.u.kill.target_pid == -1))
+                {
+                    backup = targetProcess->GetBackup ();
+                }
             }
         }
         else
@@ -256,9 +259,12 @@ void CExtKillReq::performRequest()
                                     , target_process_name.c_str()
                                     , target_verifier );
                     }
-                    cloneProcess = Nodes->CloneProcessNs( target_process_name.c_str()
-                                                        , target_verifier );
-                    targetProcess = cloneProcess;
+                    if (msg_->u.request.u.kill.target_process_name[0] == '$' )
+                    {
+                        cloneProcess = Nodes->CloneProcessNs( target_process_name.c_str()
+                                                            , target_verifier );
+                        targetProcess = cloneProcess;
+                    }
                 }     
                 else
                 { // Name Server find by nid,pid:verifier

http://git-wip-us.apache.org/repos/asf/trafodion/blob/32fe8565/core/sqf/monitor/linux/reqnotify.cxx
----------------------------------------------------------------------
diff --git a/core/sqf/monitor/linux/reqnotify.cxx b/core/sqf/monitor/linux/reqnotify.cxx
index 4d278ce..5e69681 100644
--- a/core/sqf/monitor/linux/reqnotify.cxx
+++ b/core/sqf/monitor/linux/reqnotify.cxx
@@ -180,9 +180,12 @@ void CExtNotifyReq::performRequest()
                                 , target_process_name.c_str()
                                 , target_verifier );
                 }
-                targetProcess = Nodes->GetProcess( target_process_name.c_str()
-                                                 , target_verifier
-                                                 , true, false, false );
+                if (msg_->u.request.u.notify.target_process_name[0] == '$' )
+                {
+                    targetProcess = Nodes->GetProcess( target_process_name.c_str()
+                                                     , target_verifier
+                                                     , true, false, false );
+                }
             }     
             else
             { // find by nid (check node state, don't check process state, backup is Ok)
@@ -226,8 +229,11 @@ void CExtNotifyReq::performRequest()
                                         , target_process_name.c_str()
                                         , target_verifier );
                         }
-                        targetProcess = Nodes->CloneProcessNs( target_process_name.c_str()
-                                                             , target_verifier );
+                        if (msg_->u.request.u.notify.target_process_name[0] == '$' )
+                        {
+                            targetProcess = Nodes->CloneProcessNs( target_process_name.c_str()
+                                                                 , target_verifier );
+                        }
                     }     
                     else
                     { // Name Server find by nid,pid:verifier
@@ -319,7 +325,7 @@ void CExtNotifyReq::performRequest()
             {
                 if (trace_settings & TRACE_REQUEST)
                 {
-                    trace_printf("%s@%d" " - Can't find targerProcess" "\n", method_name, __LINE__);
+                    trace_printf("%s@%d" " - Can't find targetProcess" "\n", method_name, __LINE__);
                 }
             }
         }


Mime
View raw message