Return-Path: Delivered-To: apmail-incubator-uima-commits-archive@minotaur.apache.org Received: (qmail 48237 invoked from network); 3 Oct 2009 17:47:19 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.3) by minotaur.apache.org with SMTP; 3 Oct 2009 17:47:19 -0000 Received: (qmail 28828 invoked by uid 500); 3 Oct 2009 17:47:19 -0000 Delivered-To: apmail-incubator-uima-commits-archive@incubator.apache.org Received: (qmail 28798 invoked by uid 500); 3 Oct 2009 17:47:18 -0000 Mailing-List: contact uima-commits-help@incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: uima-dev@incubator.apache.org Delivered-To: mailing list uima-commits@incubator.apache.org Received: (qmail 28789 invoked by uid 99); 3 Oct 2009 17:47:18 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Sat, 03 Oct 2009 17:47:18 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=10.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Sat, 03 Oct 2009 17:47:12 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id 98F89238893B; Sat, 3 Oct 2009 17:46:50 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r821374 [2/2] - in /incubator/uima/uimacpp/trunk/src/utils: ActiveMQAnalysisEngineService.cpp ActiveMQAnalysisEngineService.hpp deployCppService.cpp deployCppService.hpp Date: Sat, 03 Oct 2009 17:46:50 -0000 To: uima-commits@incubator.apache.org From: eae@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20091003174650.98F89238893B@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Modified: incubator/uima/uimacpp/trunk/src/utils/deployCppService.hpp URL: http://svn.apache.org/viewvc/incubator/uima/uimacpp/trunk/src/utils/deployCppService.hpp?rev=821374&r1=821373&r2=821374&view=diff ============================================================================== --- incubator/uima/uimacpp/trunk/src/utils/deployCppService.hpp (original) +++ incubator/uima/uimacpp/trunk/src/utils/deployCppService.hpp Sat Oct 3 17:46:49 2009 @@ -1,21 +1,21 @@ /* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. * This file contains code common to the ActiveMQ and WebSphere MQ * implementations of the UIMA C++ service wrapper. @@ -31,16 +31,17 @@ #include #include #include +#include class SocketLogger; class Monitor; -class UimacppService; +class AMQAnalysisEngineService; class ServiceParameters; /* - * Constants - */ +* Constants +*/ #define PROCESS_CAS_COMMAND 2000 #define GET_META_COMMAND 2001 #define CPC_COMMAND 2002 @@ -62,396 +63,366 @@ /** - * This class holds command line parameters that - * configure this service. - */ +* This class holds command line parameters that +* configure this service. +*/ class ServiceParameters { - public: - //Constructor - - ServiceParameters() : - iv_aeDescriptor(), iv_brokerURL("tcp://localhost:61616"), - iv_queueName(), iv_numInstances(1), - iv_prefetchSize(1), iv_javaport(0), - iv_datapath(), iv_loglevel(0), iv_tracelevel(-1), - iv_errThreshhold(0), iv_errWindow(0), iv_terminateOnCPCError(false), - iv_mqHost("localhost"), iv_mqPort(1414), iv_mqChannel(), iv_mqQueueMgr(), - iv_user(), iv_password(), iv_initialfsheapsize(0), - iv_wpmEndpoints("localhost:7276:BootstrapBasicMessaging"), - iv_wpmBusname() {} - + public: - ~ServiceParameters() { - - } - - - void print() { - cout << asString() << endl; - } - + ServiceParameters() : + iv_aeDescriptor(), iv_brokerURL("tcp://localhost:61616"), + iv_queueName(), iv_numInstances(1), + iv_prefetchSize(1), iv_javaport(0), + iv_datapath(), iv_loglevel(0), iv_tracelevel(-1), + iv_errThreshhold(0), iv_errWindow(0), iv_terminateOnCPCError(false), + iv_user(), iv_password(), iv_initialfsheapsize(0) {} + + ~ServiceParameters() {} + + void print() { + cout << asString() << endl; + } string ServiceParameters::asString() { stringstream str; str << "AE descriptor " << iv_aeDescriptor - << " Initial FSHeap size " << this->iv_initialfsheapsize - << " Input Queue " << iv_queueName - << " Num Instances " << iv_numInstances - << " Prefetch size " << iv_prefetchSize - << " ActiveMQ broker URL " << iv_brokerURL - << " MQ host " << iv_mqHost - << " MQ port " << iv_mqPort - << " MQ qmgr " << iv_mqQueueMgr - << " MQ channel " << iv_mqChannel - << " WAS bootstrap service address " << iv_wpmEndpoints - << " WAS SIB name " << iv_wpmBusname - << " Java port " << iv_javaport - << " logging level " << iv_loglevel - << " trace level " << iv_tracelevel - << " datapath " << iv_datapath << endl; - return str.str(); - } - - const string & getBrokerURL() const { - return iv_brokerURL; - } - - const string & getQueueName() const { - return iv_queueName; - } - - const string & getAEDescriptor() const { - return iv_aeDescriptor; - } - - const int getTraceLevel() const { - return iv_tracelevel; - } - const int getLoggingLevel() const { - return iv_loglevel; - } - const int getNumberOfInstances() const { - return iv_numInstances; - } - const int getPrefetchSize() const { - return iv_prefetchSize; - } - const int getJavaPort() const { - return iv_javaport; - } - - const string & getDataPath() { - return iv_datapath; - } + << " Initial FSHeap size " << this->iv_initialfsheapsize + << " Input Queue " << iv_queueName + << " Num Instances " << iv_numInstances + << " Prefetch size " << iv_prefetchSize + << " ActiveMQ broker URL " << iv_brokerURL + << " Java port " << iv_javaport + << " logging level " << iv_loglevel + << " trace level " << iv_tracelevel + << " datapath " << iv_datapath << endl; + return str.str(); + } + + const string & getBrokerURL() const { + return iv_brokerURL; + } + + const string & getQueueName() const { + return iv_queueName; + } + + const string & getAEDescriptor() const { + return iv_aeDescriptor; + } + + const int getTraceLevel() const { + return iv_tracelevel; + } + const int getLoggingLevel() const { + return iv_loglevel; + } + const int getNumberOfInstances() const { + return iv_numInstances; + } + const int getPrefetchSize() const { + return iv_prefetchSize; + } + const int getJavaPort() const { + return iv_javaport; + } + + const string & getDataPath() { + return iv_datapath; + } const int getErrorThreshhold() { - return iv_errThreshhold; - } + return iv_errThreshhold; + } const int getErrorWindow() { - return iv_errWindow; - } + return iv_errWindow; + } const bool terminatOnCPCError() { - return iv_terminateOnCPCError; - } - - const string & getMQHost() { - return iv_mqHost; - } - - const int getMQPort() { - return iv_mqPort; - } - - const string & getMQChannel() { - return iv_mqChannel; - } - - const string & getMQQueueMgr() { - return iv_mqQueueMgr; - } + return iv_terminateOnCPCError; + } const string & getUserName() { - return iv_user; - } + return iv_user; + } const string & getPassword() { - return iv_password; - } - - const string & getWPMEndpoints() { - return iv_wpmEndpoints; - } - - const string & getWPMBusname() { - return iv_wpmBusname; - } + return iv_password; + } const size_t getInitialFSHeapSize() { return iv_initialfsheapsize; } - void setBrokerURL(const string url) { - iv_brokerURL = url; - } - - void setQueueName(const string qname) { - iv_queueName = qname; - } - - void setAEDescriptor(const string loc) { - iv_aeDescriptor = loc; - } - - void setNumberOfInstances(int num) { - iv_numInstances=num; - } - - void setTraceLevel(int num) { - iv_tracelevel=num; - } - - void setLoggingLevel(int num) { - iv_loglevel=num; - } - void setJavaPort(int val) { - iv_javaport = val; - } - - void setDataPath(const string path) { - iv_datapath=path; - } - - void parseArgs(int argc, char* argv[]) { - int index =0; - while (++index < argc) { - char * arg = argv[index]; - if ( 0 == strcmp(arg, "-b") ) { - if (++index < argc) { - this->iv_brokerURL = argv[index]; - } - } else if (0 == strcmp(arg, "-n")) { - if (++index < argc) { - this->iv_numInstances = atoi(argv[index]); - } - } else if (0 == strcmp(arg, "-fsheapsz")) { - if (++index < argc) { - this->iv_initialfsheapsize = atoi(argv[index]); + void setBrokerURL(const string url) { + iv_brokerURL = url; + } + + void setQueueName(const string qname) { + iv_queueName = qname; + } + + void setAEDescriptor(const string loc) { + iv_aeDescriptor = loc; + } + + void setNumberOfInstances(int num) { + iv_numInstances=num; + } + + void setTraceLevel(int num) { + iv_tracelevel=num; + } + + void setLoggingLevel(int num) { + iv_loglevel=num; + } + void setJavaPort(int val) { + iv_javaport = val; + } + + void setDataPath(const string path) { + iv_datapath=path; + } + + void parseArgs(int argc, char* argv[]) { + int index =0; + while (++index < argc) { + char * arg = argv[index]; + if ( 0 == strcmp(arg, "-b") ) { + if (++index < argc) { + this->iv_brokerURL = argv[index]; + } + } else if (0 == strcmp(arg, "-n")) { + if (++index < argc) { + this->iv_numInstances = atoi(argv[index]); + } + } else if (0 == strcmp(arg, "-fsheapsz")) { + if (++index < argc) { + this->iv_initialfsheapsize = atoi(argv[index]); this->iv_initialfsheapsize = this->iv_initialfsheapsize/4; - } - } else if (0 == strcmp(arg, "-p")) { - if (++index < argc) { - this->iv_prefetchSize = atoi(argv[index]); - } - } else if (0 == strcmp(arg, "-t")) { - if (++index < argc) { - this->iv_tracelevel = atoi(argv[index]); - } - } else if (0 == strcmp(arg, "-l")) { - if (++index < argc) { - this->iv_loglevel = atoi(argv[index]); - } - } else if (0 == strcmp(arg, "-jport")) { - if (++index < argc) { - this->iv_javaport = atoi(argv[index]); - } - } else if (0 == strcmp(arg, "-d")) { - if (++index < argc) { - this->iv_datapath = argv[index]; - } - } else if (0 == strcmp(arg, "-e")) { - if (++index < argc) { - this->iv_errThreshhold = atoi(argv[index]); - } - } else if (0 == strcmp(arg, "-w")) { - if (++index < argc) { - this->iv_errWindow = atoi(argv[index]); - } - } else if (0 == strcmp(arg, "-mqh")) { - if (++index < argc) { - this->iv_mqHost = argv[index]; - } - } else if (0 == strcmp(arg, "-mqp")) { - if (++index < argc) { - this->iv_mqPort = atoi(argv[index]); - } - } else if (0 == strcmp(arg, "-mqc")) { - if (++index < argc) { - this->iv_mqChannel = argv[index]; - } - } else if (0 == strcmp(arg, "-mqm")) { - if (++index < argc) { - this->iv_mqQueueMgr = argv[index]; - } - } else if (0 == strcmp(arg, "-user")) { - if (++index < argc) { - this->iv_user = argv[index]; - } - } else if (0 == strcmp(arg, "-pw")) { - if (++index < argc) { - this->iv_password = argv[index]; - } - } else if (0 == strcmp(arg, "-wash")) { - if (++index < argc) { - this->iv_wpmEndpoints = argv[index]; - } - } else if (0 == strcmp(arg, "-wasb")) { - if (++index < argc) { - this->iv_wpmBusname = argv[index]; - } - } else if (0 == strcmp(arg, "-a")) { - if (++index < argc) { + } + } else if (0 == strcmp(arg, "-p")) { + if (++index < argc) { + this->iv_prefetchSize = atoi(argv[index]); + } + } else if (0 == strcmp(arg, "-t")) { + if (++index < argc) { + this->iv_tracelevel = atoi(argv[index]); + } + } else if (0 == strcmp(arg, "-l")) { + if (++index < argc) { + this->iv_loglevel = atoi(argv[index]); + } + } else if (0 == strcmp(arg, "-jport")) { + if (++index < argc) { + this->iv_javaport = atoi(argv[index]); + } + } else if (0 == strcmp(arg, "-d")) { + if (++index < argc) { + this->iv_datapath = argv[index]; + } + } else if (0 == strcmp(arg, "-e")) { + if (++index < argc) { + this->iv_errThreshhold = atoi(argv[index]); + } + } else if (0 == strcmp(arg, "-w")) { + if (++index < argc) { + this->iv_errWindow = atoi(argv[index]); + } + } else if (0 == strcmp(arg, "-user")) { + if (++index < argc) { + this->iv_user = argv[index]; + } + } else if (0 == strcmp(arg, "-pw")) { + if (++index < argc) { + this->iv_password = argv[index]; + } + } else if (0 == strcmp(arg, "-a")) { + if (++index < argc) { if (stricmp(argv[index],"true")==0) { - this->iv_terminateOnCPCError = true; + this->iv_terminateOnCPCError = true; } else { this->iv_terminateOnCPCError=false; } - } - } else { - if (this->iv_aeDescriptor.length() == 0) { - this->iv_aeDescriptor = argv[index]; - } else { - if (this->iv_queueName.length() == 0) { - this->iv_queueName = argv[index]; - } else { - cerr << "unexpected argument " << argv[index] << endl; - } - } - } - } - //This is a temporary fix to enable the service wrapper to - //connect to the WAS messaging engine. - if (this->iv_mqHost.find_first_of(":") != string::npos) { - this->iv_wpmEndpoints = iv_mqHost; - this->iv_mqHost = ""; - this->iv_wpmBusname = this->iv_mqQueueMgr; - this->iv_mqQueueMgr = ""; - } - } + } + } else { + if (this->iv_aeDescriptor.length() == 0) { + this->iv_aeDescriptor = argv[index]; + } else { + if (this->iv_queueName.length() == 0) { + this->iv_queueName = argv[index]; + } else { + cerr << "unexpected argument " << argv[index] << endl; + } + } + } + } + } string getServiceName() { - if (getWPMBusname().length() == 0) { - stringstream str; - str << getMQPort(); - return getQueueName() + "@" + getMQHost() + ":" + str.str(); - } else { - return getQueueName() + "@" + getWPMBusname(); - } + return getQueueName() + "@" + getBrokerURL(); } - - private: - - string iv_aeDescriptor; - - //ActiveMQ - string iv_brokerURL; - - string iv_queueName; - int iv_numInstances; - int iv_prefetchSize; + + private: + string iv_aeDescriptor; + string iv_brokerURL; + string iv_queueName; + int iv_prefetchSize; + int iv_numInstances; int iv_javaport; string iv_datapath; - int iv_loglevel; + int iv_loglevel; int iv_tracelevel; int iv_errThreshhold; int iv_errWindow; bool iv_terminateOnCPCError; - //Websphere MQ - string iv_mqHost; - int iv_mqPort; - string iv_mqChannel; - string iv_mqQueueMgr; string iv_user; string iv_password; size_t iv_initialfsheapsize; - //Websphere default messaging - string iv_wpmEndpoints; - string iv_wpmBusname; }; -/** - * Command line parameters required to start this service. - */ +/* SocketLogger */ +class SocketLogger : public uima::Logger { + private: + apr_socket_t * socket; + apr_thread_mutex_t *mutex; + + public: + SocketLogger(apr_socket_t * sock, apr_pool_t * pool) : socket(sock) { + apr_thread_mutex_create(&mutex,APR_THREAD_MUTEX_UNNESTED, pool); + } + + virtual void log(uima::LogStream::EnEntryType entrytype, + string classname, + string methodname, + string message, + long lUserCode) { + //cout << "SocketLogger::log() " << message << endl; + apr_thread_mutex_lock(mutex); + + apr_status_t rv; + stringstream str; + str << entrytype << " " << classname << " " << methodname; + if (entrytype == uima::LogStream::EnMessage) { + if (lUserCode != 0) { + str << " RC=" << lUserCode; + } + } else { + str << " RC=" << lUserCode; + } + str << " " << message << endl; + + apr_size_t len = str.str().length(); + rv = apr_socket_send(socket, str.str().c_str(), &len); + + if (rv != APR_SUCCESS) { + cerr << "apr_socket_send() failed " << str.str() << endl; + } + apr_thread_mutex_unlock(mutex); + } + + //used by Monitor to send status messages + void log(std::string message) { + apr_thread_mutex_lock(mutex); + stringstream str; + str << message << endl; + apr_size_t len = str.str().length(); + apr_status_t rv = apr_socket_send(socket, str.str().c_str(), &len); + + if (rv != APR_SUCCESS) { + cerr << "apr_socket_send() failed " << str.str() << endl; + } + apr_thread_mutex_unlock(mutex); + } + }; + class Monitor { - public: - //Constructor + public: + //Constructor - Monitor (apr_pool_t * pool, string brokerURL, - string queueName, string aeDesc, - int numInstances, int prefetchSize, int processCasErrorThreshhold, - int processCasErrorWindow, bool terminateOnCPCError) + Monitor (apr_pool_t * pool, string brokerURL, + string queueName, string aeDesc, + int numInstances, int prefetchSize, int processCasErrorThreshhold, + int processCasErrorWindow, bool terminateOnCPCError, + SocketLogger * logger) { - iv_brokerURL = brokerURL; - iv_queueName = queueName; - iv_aeDescriptor = aeDesc; - iv_numInstances = numInstances; - iv_prefetchSize = 0; - iv_cpcErrors = 0; - iv_getmetaErrors = 0; - iv_processcasErrors = 0; - iv_deserTime = 0; - iv_serTime = 0; - iv_annotatorTime = 0; - iv_startTime = apr_time_now(); - iv_numCasProcessed = 0; - iv_errorThreshhold = processCasErrorThreshhold; - iv_errorWindow = processCasErrorWindow; - iv_terminateOnCPCError = terminateOnCPCError; - iv_getmetaId = -1; - iv_numRunning = 0; - mutex = 0; - lmutex = 0; - cond = 0; - cond_mutex = 0; - apr_status_t rv = apr_thread_mutex_create(&mutex,APR_THREAD_MUTEX_UNNESTED, pool); - apr_thread_mutex_create(&cond_mutex,APR_THREAD_MUTEX_UNNESTED, pool); - apr_thread_mutex_create(&lmutex,APR_THREAD_MUTEX_UNNESTED, pool); - apr_thread_cond_create(&cond, pool); - } - - void print(); - - void shutdown() { + iv_status = "Initializing"; + iv_brokerURL = brokerURL; + iv_queueName = queueName; + iv_aeDescriptor = aeDesc; + iv_numInstances = numInstances; + iv_prefetchSize = 0; + iv_cpcErrors = 0; + iv_getmetaErrors = 0; + iv_processcasErrors = 0; + for (int i=0; i < numInstances;i++) { + iv_idleTimes[i] = 0; + } + iv_deserTime = 0; + iv_serTime = 0; + iv_annotatorTime = 0; + iv_sendTime = 0; + iv_startTime = apr_time_now(); + iv_numCasProcessed = 0; + iv_errorThreshhold = processCasErrorThreshhold; + iv_errorWindow = processCasErrorWindow; + iv_terminateOnCPCError = terminateOnCPCError; + iv_getmetaId = -1; + iv_numRunning = 0; + iv_pLogger = logger; + mutex = 0; + lmutex = 0; + cond = 0; + cond_mutex = 0; + apr_status_t rv = apr_thread_mutex_create(&mutex,APR_THREAD_MUTEX_UNNESTED, pool); + apr_thread_mutex_create(&cond_mutex,APR_THREAD_MUTEX_UNNESTED, pool); + apr_thread_mutex_create(&lmutex,APR_THREAD_MUTEX_UNNESTED, pool); + apr_thread_cond_create(&cond, pool); + } + + void print(); + + void shutdown() { //apr_thread_mutex_unlock(mutex); // apr_thread_mutex_unlock(lmutex); apr_thread_cond_signal(this->cond); return ; } - - const string & getBrokerURL() const { - return iv_brokerURL; - } - - const string & getQueueName() const { - return iv_queueName; - } - - const string & getAEDescriptor() const { - return iv_aeDescriptor; - } + + const string & getBrokerURL() const { + return iv_brokerURL; + } + + const string & getQueueName() const { + return iv_queueName; + } + + const string & getAEDescriptor() const { + return iv_aeDescriptor; + } const int getNumberOfInstances() const { - return iv_numInstances; - } - - void setBrokerURL(const string url) { - iv_brokerURL = url; - } - - void setQueueName(const string qname) { - iv_queueName = qname; - } - - void setAEDescriptor(const string loc) { - iv_aeDescriptor = loc; - } - - void setNumberOfInstances(int num) { - iv_numInstances=num; + return iv_numInstances; + } + + void setBrokerURL(const string url) { + iv_brokerURL = url; + } + + void setQueueName(const string qname) { + iv_queueName = qname; + } + + void setAEDescriptor(const string loc) { + iv_aeDescriptor = loc; + } + + void setNumberOfInstances(int num) { + iv_numInstances=num; iv_numRunning = num; - } + } void listenerStopped(int id) { cout << "listener stopped " << id << endl; @@ -469,150 +440,272 @@ } void setStartTime() { - apr_thread_mutex_lock(mutex); + apr_thread_mutex_lock(mutex); iv_startTime = apr_time_now(); - apr_thread_mutex_unlock(mutex); + apr_thread_mutex_unlock(mutex); + } + + apr_time_t getServiceStartTime() { + return iv_startTime; } void logMessage(string message) { - apr_thread_mutex_lock(lmutex); - uima::ResourceManager::getInstance().getLogger().logMessage(message); - apr_thread_mutex_unlock(lmutex); + apr_thread_mutex_lock(lmutex); + uima::ResourceManager::getInstance().getLogger().logMessage(message); + apr_thread_mutex_unlock(lmutex); } void logWarning(string message) { - apr_thread_mutex_lock(lmutex); - uima::ResourceManager::getInstance().getLogger().logWarning(message); - apr_thread_mutex_unlock(lmutex); + apr_thread_mutex_lock(lmutex); + uima::ResourceManager::getInstance().getLogger().logWarning(message); + apr_thread_mutex_unlock(lmutex); } void logError(string message) { - apr_thread_mutex_lock(lmutex); - uima::ResourceManager::getInstance().getLogger().logError(message,-99); - apr_thread_mutex_unlock(lmutex); - } - - void processingComplete(int command, bool success, apr_time_t totalTime, - apr_time_t deserTime=0, - apr_time_t analyticTime=0, - apr_time_t serTime=0) { + apr_thread_mutex_lock(lmutex); + uima::ResourceManager::getInstance().getLogger().logError(message,-99); + apr_thread_mutex_unlock(lmutex); + } + + void reconnecting(int id) { apr_thread_mutex_lock(mutex); - iv_messageProcessTime += totalTime; - if (command == PROCESS_CAS_COMMAND) { - iv_numCasProcessed++; - iv_deserTime += deserTime; - iv_annotatorTime += analyticTime; - iv_serTime += serTime; - } else if (command == GET_META_COMMAND) { - iv_getmetaTime += totalTime; - } else if (command == CPC_COMMAND) { - iv_cpcTime += totalTime; - } - - if (!success) { - incrementErrorCount(command); + if (id == this->iv_getmetaId) { + iv_status = "Reconnecting"; + if (iv_pLogger != 0) + iv_pLogger->log("StatusReconnecting"); } - apr_thread_mutex_unlock(mutex); + apr_thread_mutex_unlock(mutex); } - + + void reconnectionSuccess(int id) { + apr_thread_mutex_lock(mutex); + if (id == this->iv_getmetaId ) { + iv_status = "Running"; + if (iv_pLogger != 0) + iv_pLogger->log("StatusReconnectionSuccess"); + } + apr_thread_mutex_unlock(mutex); + } + + void running(int id){ + apr_thread_mutex_lock(mutex); + if (id == this->iv_getmetaId) { + iv_status = "Running"; + } + apr_thread_mutex_unlock(mutex); + } + + //Called when a processing of a PROCESSCAS or CPC request is started. + //It records the processing start time and accumulates the idle time + //for this message processing instance. + void processingStarted(int id, apr_time_t startTime, apr_time_t idleTime) { + apr_thread_mutex_lock(mutex); + if (id != iv_getmetaId) { + //cout << "processingStarted idleTime=" << idleTime << endl; + iv_processingStarted[id] = startTime; //record start time + if (idleTime > 0) { //convert to millis + iv_idleTimes[id] += (idleTime/1000); + } else { + iv_idleTimes[id] = 0; + } + } + apr_thread_mutex_unlock(mutex); + } + + //Called when handling of a PROCESSCAS or CPC request is finished. + //Records the timing data and error counts. + //Resets the processingStarted and processingCompleted timestamps + //for this instance. + //All times are in microseconds. + void processingComplete(int id, int command, bool success, + apr_time_t totalTime, + apr_time_t deserTime=0, + apr_time_t analyticTime=0, + apr_time_t serTime=0, + apr_time_t sendTime=0) { + apr_thread_mutex_lock(mutex); + iv_messageProcessTime += totalTime; + iv_sendTime += sendTime; + if (command == PROCESS_CAS_COMMAND) { + iv_numCasProcessed++; + iv_deserTime += deserTime; + iv_annotatorTime += analyticTime; + iv_serTime += serTime; + } else if (command == GET_META_COMMAND) { + iv_getmetaTime += totalTime; + } else if (command == CPC_COMMAND) { + iv_cpcTime += totalTime; + } + + if (!success) { + incrementErrorCount(command); + } + if (id != iv_getmetaId) { + iv_processingStarted[id] = 0; + iv_processingComplete[id] = apr_time_now(); + } + apr_thread_mutex_unlock(mutex); + } + + //Takes a snapshot of the statistics and writes it to the + //socket. This is invoked when the UimacppServiceControllerMBean + //requests statistics for this service. void writeStatistics(apr_socket_t * cs) { - apr_thread_mutex_lock(mutex); - stringstream str; - apr_time_t totalProcessTime = (apr_time_now() - iv_startTime)*iv_numInstances; - - // idle time in millis - long idleTime = (totalProcessTime-iv_messageProcessTime)/1000; - str << "NUMINSTANCES=" << iv_numInstances - << " CPCERRORS=" << iv_cpcErrors - << " GETMETAERRORS=" << iv_getmetaErrors - << " PROCESSCASERRORS=" << iv_processcasErrors - << " CPCTIME=" << iv_cpcTime/1000 - << " GETMETATIME=" << iv_getmetaTime/1000 - << " NUMCASPROCESSED=" << iv_numCasProcessed - << " SERIALIZETIME=" << iv_serTime/1000 - << " DESERIALIZETIME=" << iv_deserTime/1000 - << " ANNOTATORTIME=" << iv_annotatorTime/1000 - << " MESSAGEPROCESSTIME=" << iv_messageProcessTime/1000 - << " IDLETIME=" << idleTime << endl; - - apr_size_t len = str.str().length(); - apr_status_t rv = apr_socket_send(cs, str.str().c_str(), &len); - len = 1; - apr_socket_send(cs,"\n", &len); - if (rv != APR_SUCCESS) { - //TODO throw exception - cout << "apr_socket_send() failed " << endl; - } - - apr_thread_mutex_unlock(mutex); + apr_thread_mutex_lock(mutex); + stringstream str; + getStatistics(str); + apr_size_t len = str.str().length(); + apr_status_t rv = apr_socket_send(cs, str.str().c_str(), &len); + len = 1; + apr_socket_send(cs,"\n", &len); + if (rv != APR_SUCCESS) { + //TODO throw exception + cout << "apr_socket_send() failed " << endl; + } + cout << "ThreadId: " << apr_os_thread_current() << " writeStatistics() " << str.str() << endl; + apr_thread_mutex_unlock(mutex); + } + + + //Collects and formats statistics data. + void getStatistics(stringstream & str) { + apr_time_t snapshotTime = apr_time_now(); + apr_time_t totalProcessTime = (snapshotTime - iv_startTime)*iv_numInstances; + //accumulate idle times recorded by instances as they processed + //requests. If an instance has never recorded idle time, + //then it has not processed any requests and idle time for that + //instance is computed from time the service was started. + INT64 idleTime = 0; + for (int i=0; i < iv_numInstances; i++) { + if (iv_idleTimes[i] == 0 ) { + INT64 idleT = (snapshotTime - iv_startTime)/1000; //is this right ?? + if (idleT > 0) { + idleTime += idleT; + } + } else { + if (iv_idleTimes[i] > 0) { + idleTime += iv_idleTimes[i]; + } + } + } + //cerr << "IDLETIME recorded by instances " << idleTime << endl; + + //account for instances that have processed requests but are + //currently idle + map::iterator ite; + for (ite=iv_processingStarted.begin();ite != iv_processingStarted.end();ite++) { + if (ite->second == 0) { + INT64 idleT = 0; + if (iv_processingComplete[ite->first] > 0) { + idleT = (snapshotTime - iv_processingComplete[ite->first])/1000; + if (idleT > 0) { + //cerr << "currently idle " << ite->first << " for " << idleT << endl; + idleTime += idleT; + } + } + } + } + + //cerr << "IDLETIME after accounting for current idle instances " << idleTime << endl; + //Format string. Each statistics represented as name=value pairs. + //Each pair separated by a space. i + //Convert all times to millis. + str << "NUMINSTANCES=" << iv_numInstances + << " CPCERRORS=" << iv_cpcErrors + << " GETMETAERRORS=" << iv_getmetaErrors + << " PROCESSCASERRORS=" << iv_processcasErrors + << " CPCTIME=" << iv_cpcTime/1000 + << " GETMETATIME=" << iv_getmetaTime/1000 + << " NUMCASPROCESSED=" << iv_numCasProcessed + << " SERIALIZETIME=" << iv_serTime/iv_numInstances/1000 + << " DESERIALIZETIME=" << iv_deserTime/iv_numInstances/1000 + << " ANNOTATORTIME=" << iv_annotatorTime/iv_numInstances/1000 + << " SENDTIME=" << iv_sendTime/iv_numInstances/1000 + << " MESSAGEPROCESSTIME=" << iv_messageProcessTime/iv_numInstances/1000 + << " IDLETIME=" << idleTime/iv_numInstances << " SERVICEUPTIME=" << totalProcessTime/iv_numInstances/1000 + << " STATUS=" << iv_status << " SERVICESTARTTIME=" << iv_startTime + << " SNAPSHOTTIME=" << snapshotTime << endl; + + //cout << apr_os_thread_current() << " write statistics " << str.str() << endl; } void reset() { - apr_thread_mutex_lock(mutex); - - // idle time in millis - this->iv_numCasProcessed=0; - this->iv_annotatorTime=0; - this->iv_deserTime=0; - this->iv_serTime=0; - this->iv_cpcTime=0; - this->iv_getmetaTime=0; - this->iv_messageProcessTime=0; - - this->iv_startTime=apr_time_now(); - - this->iv_cpcErrors=0; - this->iv_getmetaErrors=0; - this->iv_processcasErrors=0; - - apr_thread_mutex_unlock(mutex); + apr_thread_mutex_lock(mutex); + + this->iv_numCasProcessed=0; + this->iv_annotatorTime=0; + this->iv_deserTime=0; + this->iv_serTime=0; + this->iv_cpcTime=0; + this->iv_getmetaTime=0; + this->iv_sendTime=0; + this->iv_messageProcessTime=0; + + this->iv_startTime=apr_time_now(); + + this->iv_cpcErrors=0; + this->iv_getmetaErrors=0; + this->iv_processcasErrors=0; + this->iv_processingStarted.clear(); + this->iv_processingComplete.clear(); + for (int i=0; i < iv_numInstances; i++) { + iv_idleTimes[i] =0; + } + apr_thread_mutex_unlock(mutex); } - + void printStatistics(); -public: + public: apr_thread_mutex_t *cond_mutex; apr_thread_cond_t *cond; -private: - apr_thread_mutex_t *mutex; + private: + apr_thread_mutex_t *mutex; apr_thread_mutex_t *lmutex; - string iv_brokerURL; - string iv_queueName; + SocketLogger * iv_pLogger; + string iv_brokerURL; + string iv_queueName; string iv_aeDescriptor; - int iv_numInstances; + string iv_status; + int iv_numInstances; int iv_prefetchSize; - long iv_cpcErrors; - long iv_getmetaErrors; - long iv_processcasErrors; + + INT64 iv_cpcErrors; + INT64 iv_getmetaErrors; + INT64 iv_processcasErrors; + int iv_getmetaId; + /////apr_os_thread_t iv_getmetaThreadId; + map iv_processingStarted; + map iv_processingComplete; + map iv_idleTimes; int iv_numRunning; apr_time_t iv_deserTime; apr_time_t iv_serTime; apr_time_t iv_annotatorTime; + apr_time_t iv_sendTime; apr_time_t iv_messageProcessTime; apr_time_t iv_cpcTime; apr_time_t iv_getmetaTime; apr_time_t iv_startTime; - - long iv_numCasProcessed; + + INT64 iv_numCasProcessed; int iv_errorThreshhold; int iv_errorWindow; bool iv_terminateOnCPCError; void incrementErrorCount(int command) { - if (command == PROCESS_CAS_COMMAND) { iv_processcasErrors++; if (this->iv_errorThreshhold > 0 ) { - if ( this->iv_errorWindow == 0 && iv_processcasErrors >= - this->iv_errorThreshhold) { - cerr << " number of PROCESSCAS errors exceeded the threshhold. Terminating the service." << endl; - shutdown(); - } else { - //TODO sliding window - - } + if ( this->iv_errorWindow == 0 && iv_processcasErrors >= + this->iv_errorThreshhold) { + cerr << " number of PROCESSCAS errors exceeded the threshhold. Terminating the service." << endl; + shutdown(); + } else { + //TODO sliding window + } } } else if (command == GET_META_COMMAND) { iv_getmetaErrors++; @@ -626,242 +719,181 @@ } } } - -}; - - -/* SocketLogger */ -class SocketLogger : public uima::Logger { - private: - apr_socket_t * socket; - apr_thread_mutex_t *mutex; - - public: - SocketLogger(apr_socket_t * sock, apr_pool_t * pool) : socket(sock) { - apr_thread_mutex_create(&mutex,APR_THREAD_MUTEX_UNNESTED, pool); - } - - virtual void log(uima::LogStream::EnEntryType entrytype, - string classname, - string methodname, - string message, - long lUserCode) { - //cout << "SocketLogger::log() " << message << endl; - apr_thread_mutex_lock(mutex); - - apr_status_t rv; - stringstream str; - str << entrytype << " " << classname << " " << methodname; - if (entrytype == uima::LogStream::EnMessage) { - if (lUserCode != 0) { - str << " RC=" << lUserCode; - } - } else { - str << " RC=" << lUserCode; - } - str << " " << message << endl; - - apr_size_t len = str.str().length(); - rv = apr_socket_send(socket, str.str().c_str(), &len); - - if (rv != APR_SUCCESS) { - //TODO throw exception - cerr << "apr_socket_send() failed " << endl; - } - apr_thread_mutex_unlock(mutex); - } - -}; + }; - - static SocketLogger * singleton_pLogger =0; - static Monitor * singleton_pMonitor=0; - //static ServiceParameters serviceDesc; - - - //static apr_pool_t *pool=0; - static apr_socket_t *s=0; //logger socket - static apr_socket_t *cs=0; //receive commands socket - static apr_sockaddr_t *sa=0; - static apr_status_t rv=0; - - ///static apr_thread_t *thread=0; - ///static apr_threadattr_t *thd_attr=0; - - static void signal_handler(int signum) { - stringstream str; - str << __FILE__ << __LINE__ << " Received Signal: " << signum; - cerr << str.str() << endl; - singleton_pMonitor->shutdown(); +static SocketLogger * singleton_pLogger =0; +static Monitor * singleton_pMonitor=0; +static apr_socket_t *s=0; //logger socket +static apr_socket_t *cs=0; //receive commands socket +static apr_sockaddr_t *sa=0; +static apr_status_t rv=0; + +static void signal_handler(int signum) { + stringstream str; + str << __FILE__ << __LINE__ << " Received Signal: " << signum; + cerr << str.str() << endl; + singleton_pMonitor->shutdown(); +} + + +static int terminateService() { + + cout << "deployCppService::terminateService" << endl; + + if (cs) { + apr_socket_close(cs); + cs=0; } + if (singleton_pMonitor) { + delete singleton_pMonitor; + singleton_pMonitor=0; + } - static int terminateService() { - - cout << "deployCppService::terminateService" << endl; - - if (cs) { - apr_socket_close(cs); - cs=0; - } - - if (singleton_pMonitor) { - delete singleton_pMonitor; - singleton_pMonitor=0; - } - - if (singleton_pLogger) { - uima::ResourceManager::getInstance().unregisterLogger(singleton_pLogger); - delete singleton_pLogger; - singleton_pLogger =0; - } + if (singleton_pLogger) { + uima::ResourceManager::getInstance().unregisterLogger(singleton_pLogger); + delete singleton_pLogger; + singleton_pLogger =0; + } - if (s) { + if (s) { apr_socket_close(s); s=0; - } - return 0; - } - - - static int initialize(ServiceParameters & serviceDesc, apr_pool_t * pool) { - - if (serviceDesc.getLoggingLevel() == 0) { - uima::ResourceManager::getInstance().setLoggingLevel(uima::LogStream::EnMessage); - } else if (serviceDesc.getLoggingLevel() == 1) { - uima::ResourceManager::getInstance().setLoggingLevel(uima::LogStream::EnWarning); - } else if (serviceDesc.getLoggingLevel() == 2) { - uima::ResourceManager::getInstance().setLoggingLevel(uima::LogStream::EnError); - } - - /*use only first path that exists */ - if (serviceDesc.getDataPath().length() > 0) { - uima::util::Filename dataPath(""); - dataPath.determinePath(serviceDesc.getDataPath().c_str()); - uima::util::Location dataLocation(dataPath.getAsCString()); - uima::ResourceManager::getInstance().setNewLocationData(dataLocation); - } - - //register signal handler - apr_signal(SIGINT, signal_handler); - apr_signal(SIGTERM, signal_handler); - - /* create object to collect JMX stats */ - singleton_pMonitor = new Monitor(pool, - serviceDesc.getBrokerURL(),serviceDesc.getQueueName(), - serviceDesc.getAEDescriptor(), serviceDesc.getNumberOfInstances(), - serviceDesc.getPrefetchSize(), - serviceDesc.getErrorThreshhold(), - serviceDesc.getErrorWindow(), - serviceDesc.terminatOnCPCError()); - - /* set up connection to Java controller bean if a port is specified */ - int javaport=serviceDesc.getJavaPort(); - if (javaport > 0) { - //cout << "connecting to java controller port " << javaport << endl; - rv = apr_sockaddr_info_get(&sa, "localhost", APR_INET, javaport, 0, pool); - if (rv != APR_SUCCESS) { - cerr << "ERROR: apr_sockaddr_info_get localhost at port " << javaport << endl; - return -2; - } + } + return 0; +} - rv = apr_socket_create(&s, sa->family, SOCK_STREAM, APR_PROTO_TCP, pool); - if (rv != APR_SUCCESS) { - cerr << "ERROR: apr_socket_create() logger connection at port " << javaport << endl; - return -3; - } - rv = apr_socket_connect(s, sa); - if (rv != APR_SUCCESS) { - cerr << "ERROR: apr_socket_connect() logger connection at port " << javaport << endl; - return -4; - } +static int initialize(ServiceParameters & serviceDesc, apr_pool_t * pool) { - //commands connection - rv = apr_socket_create(&cs, sa->family, SOCK_STREAM, APR_PROTO_TCP, pool); - if (rv != APR_SUCCESS) { - cerr << "ERROR: apr_socket_create() commands connection at port " << javaport << endl; - return -5; - } + if (serviceDesc.getLoggingLevel() == 0) { + uima::ResourceManager::getInstance().setLoggingLevel(uima::LogStream::EnMessage); + } else if (serviceDesc.getLoggingLevel() == 1) { + uima::ResourceManager::getInstance().setLoggingLevel(uima::LogStream::EnWarning); + } else if (serviceDesc.getLoggingLevel() == 2) { + uima::ResourceManager::getInstance().setLoggingLevel(uima::LogStream::EnError); + } - rv = apr_socket_connect(cs, sa); - if (rv != APR_SUCCESS) { - cerr << "ERROR: apr_socket_connect() commands connection at port " << javaport << endl; - return -6; - } + /*use only first path that exists */ + if (serviceDesc.getDataPath().length() > 0) { + uima::util::Filename dataPath(""); + dataPath.determinePath(serviceDesc.getDataPath().c_str()); + uima::util::Location dataLocation(dataPath.getAsCString()); + uima::ResourceManager::getInstance().setNewLocationData(dataLocation); + } - //register SocketLogger - singleton_pLogger = new SocketLogger(s,pool); - if (singleton_pLogger == NULL) { - cerr << "ERROR: SocketLogger() failed. " << endl; - return -7; - } - uima::ResourceManager::getInstance().registerLogger(singleton_pLogger); - cout << "deployCppService::initialize() Registered Java Logger." << endl; - } - - return 0; - } //initialize + //register signal handler + apr_signal(SIGINT, signal_handler); + apr_signal(SIGTERM, signal_handler); + + /* set up connection to Java controller bean if a port is specified */ + int javaport=serviceDesc.getJavaPort(); + if (javaport > 0) { + //cout << "connecting to java controller port " << javaport << endl; + rv = apr_sockaddr_info_get(&sa, "localhost", APR_INET, javaport, 0, pool); + if (rv != APR_SUCCESS) { + cerr << "ERROR: apr_sockaddr_info_get localhost at port " << javaport << endl; + return -2; + } + + rv = apr_socket_create(&s, sa->family, SOCK_STREAM, APR_PROTO_TCP, pool); + if (rv != APR_SUCCESS) { + cerr << "ERROR: apr_socket_create() logger connection at port " << javaport << endl; + return -3; + } + + rv = apr_socket_connect(s, sa); + if (rv != APR_SUCCESS) { + cerr << "ERROR: apr_socket_connect() logger connection at port " << javaport << endl; + return -4; + } + + //commands connection + rv = apr_socket_create(&cs, sa->family, SOCK_STREAM, APR_PROTO_TCP, pool); + if (rv != APR_SUCCESS) { + cerr << "ERROR: apr_socket_create() commands connection at port " << javaport << endl; + return -5; + } + + rv = apr_socket_connect(cs, sa); + if (rv != APR_SUCCESS) { + cerr << "ERROR: apr_socket_connect() commands connection at port " << javaport << endl; + return -6; + } + + //register SocketLogger + singleton_pLogger = new SocketLogger(s,pool); + if (singleton_pLogger == NULL) { + cerr << "ERROR: SocketLogger() failed. " << endl; + return -7; + } + uima::ResourceManager::getInstance().registerLogger(singleton_pLogger); + } + /* create object to collect JMX stats */ + singleton_pMonitor = new Monitor(pool, + serviceDesc.getBrokerURL(),serviceDesc.getQueueName(), + serviceDesc.getAEDescriptor(), serviceDesc.getNumberOfInstances(), + serviceDesc.getPrefetchSize(), + serviceDesc.getErrorThreshhold(), + serviceDesc.getErrorWindow(), + serviceDesc.terminatOnCPCError(), + singleton_pLogger); + return 0; +} //initialize static void* APR_THREAD_FUNC handleCommands(apr_thread_t *thd, void *data) { - //if we are here service initialization was successful, send message - //to controller. - string msg = "0"; - apr_size_t len = msg.length(); - rv = apr_socket_send(cs, msg.c_str(), &len); - len = 1; - apr_socket_send(cs,"\n", &len); - //receive JMX, admin requests from controller - char buf[9]; - memset(buf,0,9); - len = 8; - while ( (rv = apr_socket_recv(cs, buf, &len)) != APR_EOF) { - string command = buf; - memset(buf,0,9); - len=8; - //cout << "apr_socket_recv command=" << command << endl; - if (command.compare("GETSTATS")==0) { - //singleton_pLogger->log(LogStream::EnMessage,"deployCppService","getStats","retrieving stats",0); - singleton_pMonitor->writeStatistics(cs); - //singleton_pLogger->log(LogStream::EnMessage,"deployCppService","getStats","sent statistics",0); - } else if (command.compare("RESET")==0) { - singleton_pLogger->log(uima::LogStream::EnMessage,"deployCppService", "RESET", - "reset JMX statistics",0); - singleton_pMonitor->reset(); - } else if (command.compare("SHUTDOWN")==0) { - singleton_pMonitor->shutdown(); - break; - } else { - if (rv != APR_SUCCESS) { - singleton_pMonitor->shutdown(); - break; - } else { - char * c = new char[256]; - apr_strerror(rv, c, 255); - stringstream str; - str << c; - str << "deployCppService::handleCommand() aprerror=" << rv << " invalid command=" << command; - cerr << str.str() << endl; - delete c; - } - } - } - apr_thread_exit(thd, APR_SUCCESS); - cout << "deployCppService::handleCommand() calling shutdown. " << endl; + //if we are here service initialization was successful, send message + //to controller. + string msg = "0"; + apr_size_t len = msg.length(); + rv = apr_socket_send(cs, msg.c_str(), &len); + len = 1; + apr_socket_send(cs,"\n", &len); + cout << "sent 0 to controller " << endl; + //receive JMX, admin requests from controller + char buf[9]; + memset(buf,0,9); + len = 8; + while ( (rv = apr_socket_recv(cs, buf, &len)) != APR_EOF) { + string command = buf; + memset(buf,0,9); + len=8; + //cout << "apr_socket_recv command=" << command << endl; + if (command.compare("GETSTATS")==0) { + //singleton_pLogger->log(LogStream::EnMessage,"deployCppService","getStats","retrieving stats",0); + singleton_pMonitor->writeStatistics(cs); + //singleton_pLogger->log(LogStream::EnMessage,"deployCppService","getStats","sent statistics",0); + } else if (command.compare("RESET")==0) { + singleton_pLogger->log(uima::LogStream::EnMessage,"deployCppService", "RESET", + "reset JMX statistics",0); + singleton_pMonitor->reset(); + } else if (command.compare("SHUTDOWN")==0) { singleton_pMonitor->shutdown(); - return NULL; - } - - - - - + break; + } else { + if (rv != APR_SUCCESS) { + singleton_pMonitor->shutdown(); + break; + } else { + char * c = new char[256]; + apr_strerror(rv, c, 255); + stringstream str; + str << c; + str << "deployCppService::handleCommand() aprerror=" << rv << " invalid command=" << command; + cerr << str.str() << endl; + delete c; + } + } + } + apr_thread_exit(thd, APR_SUCCESS); + cout << "deployCppService::handleCommand() calling shutdown. " << endl; + singleton_pMonitor->shutdown(); + return NULL; +} #endif - -