airavata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From scnakand...@apache.org
Subject [07/51] [partial] airavata-php-gateway git commit: removing files from wrong merge
Date Tue, 13 Dec 2016 22:09:20 GMT
http://git-wip-us.apache.org/repos/asf/airavata-php-gateway/blob/d55608f1/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/thrift/server/TNonblockingServer.cpp
----------------------------------------------------------------------
diff --git a/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/thrift/server/TNonblockingServer.cpp b/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/thrift/server/TNonblockingServer.cpp
deleted file mode 100644
index b9553c4..0000000
--- a/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/thrift/server/TNonblockingServer.cpp
+++ /dev/null
@@ -1,1567 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-#define __STDC_FORMAT_MACROS
-
-#include <thrift/thrift-config.h>
-
-#include <thrift/server/TNonblockingServer.h>
-#include <thrift/concurrency/Exception.h>
-#include <thrift/transport/TSocket.h>
-#include <thrift/concurrency/PlatformThreadFactory.h>
-#include <thrift/transport/PlatformSocket.h>
-
-#include <iostream>
-
-#ifdef HAVE_SYS_SOCKET_H
-#include <sys/socket.h>
-#endif
-
-#ifdef HAVE_NETINET_IN_H
-#include <netinet/in.h>
-#include <netinet/tcp.h>
-#endif
-
-#ifdef HAVE_ARPA_INET_H
-#include <arpa/inet.h>
-#endif
-
-#ifdef HAVE_NETDB_H
-#include <netdb.h>
-#endif
-
-#ifdef HAVE_FCNTL_H
-#include <fcntl.h>
-#endif
-
-#include <assert.h>
-
-#ifdef HAVE_SCHED_H
-#include <sched.h>
-#endif
-
-#ifndef AF_LOCAL
-#define AF_LOCAL AF_UNIX
-#endif
-
-#if !defined(PRIu32)
-#define PRIu32 "I32u"
-#define PRIu64 "I64u"
-#endif
-
-namespace apache { namespace thrift { namespace server {
-
-using namespace apache::thrift::protocol;
-using namespace apache::thrift::transport;
-using namespace apache::thrift::concurrency;
-using namespace std;
-using apache::thrift::transport::TSocket;
-using apache::thrift::transport::TTransportException;
-using boost::shared_ptr;
-
-/// Three states for sockets: recv frame size, recv data, and send mode
-enum TSocketState {
-  SOCKET_RECV_FRAMING,
-  SOCKET_RECV,
-  SOCKET_SEND
-};
-
-/**
- * Five states for the nonblocking server:
- *  1) initialize
- *  2) read 4 byte frame size
- *  3) read frame of data
- *  4) send back data (if any)
- *  5) force immediate connection close
- */
-enum TAppState {
-  APP_INIT,
-  APP_READ_FRAME_SIZE,
-  APP_READ_REQUEST,
-  APP_WAIT_TASK,
-  APP_SEND_RESULT,
-  APP_CLOSE_CONNECTION
-};
-
-/**
- * Represents a connection that is handled via libevent. This connection
- * essentially encapsulates a socket that has some associated libevent state.
- */
-class TNonblockingServer::TConnection {
- private:
-  /// Server IO Thread handling this connection
-  TNonblockingIOThread* ioThread_;
-
-  /// Server handle
-  TNonblockingServer* server_;
-
-  /// TProcessor
-  boost::shared_ptr<TProcessor> processor_;
-
-  /// Object wrapping network socket
-  boost::shared_ptr<TSocket> tSocket_;
-
-  /// Libevent object
-  struct event event_;
-
-  /// Libevent flags
-  short eventFlags_;
-
-  /// Socket mode
-  TSocketState socketState_;
-
-  /// Application state
-  TAppState appState_;
-
-  /// How much data needed to read
-  uint32_t readWant_;
-
-  /// Where in the read buffer are we
-  uint32_t readBufferPos_;
-
-  /// Read buffer
-  uint8_t* readBuffer_;
-
-  /// Read buffer size
-  uint32_t readBufferSize_;
-
-  /// Write buffer
-  uint8_t* writeBuffer_;
-
-  /// Write buffer size
-  uint32_t writeBufferSize_;
-
-  /// How far through writing are we?
-  uint32_t writeBufferPos_;
-
-  /// Largest size of write buffer seen since buffer was constructed
-  size_t largestWriteBufferSize_;
-
-  /// Count of the number of calls for use with getResizeBufferEveryN().
-  int32_t callsForResize_;
-
-  /// Task handle
-  int taskHandle_;
-
-  /// Task event
-  struct event taskEvent_;
-
-  /// Transport to read from
-  boost::shared_ptr<TMemoryBuffer> inputTransport_;
-
-  /// Transport that processor writes to
-  boost::shared_ptr<TMemoryBuffer> outputTransport_;
-
-  /// extra transport generated by transport factory (e.g. BufferedRouterTransport)
-  boost::shared_ptr<TTransport> factoryInputTransport_;
-  boost::shared_ptr<TTransport> factoryOutputTransport_;
-
-  /// Protocol decoder
-  boost::shared_ptr<TProtocol> inputProtocol_;
-
-  /// Protocol encoder
-  boost::shared_ptr<TProtocol> outputProtocol_;
-
-  /// Server event handler, if any
-  boost::shared_ptr<TServerEventHandler> serverEventHandler_;
-
-  /// Thrift call context, if any
-  void *connectionContext_;
-
-  /// Go into read mode
-  void setRead() {
-    setFlags(EV_READ | EV_PERSIST);
-  }
-
-  /// Go into write mode
-  void setWrite() {
-    setFlags(EV_WRITE | EV_PERSIST);
-  }
-
-  /// Set socket idle
-  void setIdle() {
-    setFlags(0);
-  }
-
-  /**
-   * Set event flags for this connection.
-   *
-   * @param eventFlags flags we pass to libevent for the connection.
-   */
-  void setFlags(short eventFlags);
-
-  /**
-   * Libevent handler called (via our static wrapper) when the connection
-   * socket had something happen.  Rather than use the flags libevent passed,
-   * we use the connection state to determine whether we need to read or
-   * write the socket.
-   */
-  void workSocket();
-
- public:
-
-  class Task;
-
-  /// Constructor
-  TConnection(THRIFT_SOCKET socket, TNonblockingIOThread* ioThread,
-              const sockaddr* addr, socklen_t addrLen) {
-    readBuffer_ = NULL;
-    readBufferSize_ = 0;
-
-    ioThread_ = ioThread;
-    server_ = ioThread->getServer();
-
-    // Allocate input and output transports these only need to be allocated
-    // once per TConnection (they don't need to be reallocated on init() call)
-    inputTransport_.reset(new TMemoryBuffer(readBuffer_, readBufferSize_));
-    outputTransport_.reset(
-      new TMemoryBuffer(static_cast<uint32_t>(server_->getWriteBufferDefaultSize())));
-    tSocket_.reset(new TSocket());
-    init(socket, ioThread, addr, addrLen);
-  }
-
-  ~TConnection() {
-    std::free(readBuffer_);
-  }
-
-  /// Close this connection and free or reset its resources.
-  void close();
-
- /**
-   * Check buffers against any size limits and shrink it if exceeded.
-   *
-   * @param readLimit we reduce read buffer size to this (if nonzero).
-   * @param writeLimit if nonzero and write buffer is larger, replace it.
-   */
-  void checkIdleBufferMemLimit(size_t readLimit, size_t writeLimit);
-
-  /// Initialize
-  void init(THRIFT_SOCKET socket, TNonblockingIOThread* ioThread,
-            const sockaddr* addr, socklen_t addrLen);
-
-  /**
-   * This is called when the application transitions from one state into
-   * another. This means that it has finished writing the data that it needed
-   * to, or finished receiving the data that it needed to.
-   */
-  void transition();
-
-  /**
-   * C-callable event handler for connection events.  Provides a callback
-   * that libevent can understand which invokes connection_->workSocket().
-   *
-   * @param fd the descriptor the event occurred on.
-   * @param which the flags associated with the event.
-   * @param v void* callback arg where we placed TConnection's "this".
-   */
-  static void eventHandler(evutil_socket_t fd, short /* which */, void* v) {
-    assert(fd == ((TConnection*)v)->getTSocket()->getSocketFD());
-    ((TConnection*)v)->workSocket();
-  }
-
-  /**
-   * Notification to server that processing has ended on this request.
-   * Can be called either when processing is completed or when a waiting
-   * task has been preemptively terminated (on overload).
-   *
-   * Don't call this from the IO thread itself.
-   *
-   * @return true if successful, false if unable to notify (check THRIFT_GET_SOCKET_ERROR).
-   */
-  bool notifyIOThread() {
-    return ioThread_->notify(this);
-  }
-
-  /*
-   * Returns the number of this connection's currently assigned IO
-   * thread.
-   */
-  int getIOThreadNumber() const {
-    return ioThread_->getThreadNumber();
-  }
-
-  /// Force connection shutdown for this connection.
-  void forceClose() {
-    appState_ = APP_CLOSE_CONNECTION;
-    if (!notifyIOThread()) {
-      throw TException("TConnection::forceClose: failed write on notify pipe");
-    }
-  }
-
-  /// return the server this connection was initialized for.
-  TNonblockingServer* getServer() const {
-    return server_;
-  }
-
-  /// get state of connection.
-  TAppState getState() const {
-    return appState_;
-  }
-
-  /// return the TSocket transport wrapping this network connection
-  boost::shared_ptr<TSocket> getTSocket() const {
-    return tSocket_;
-  }
-
-  /// return the server event handler if any
-  boost::shared_ptr<TServerEventHandler> getServerEventHandler() {
-    return serverEventHandler_;
-  }
-
-  /// return the Thrift connection context if any
-  void* getConnectionContext() {
-    return connectionContext_;
-  }
-
-};
-
-class TNonblockingServer::TConnection::Task: public Runnable {
- public:
-  Task(boost::shared_ptr<TProcessor> processor,
-       boost::shared_ptr<TProtocol> input,
-       boost::shared_ptr<TProtocol> output,
-       TConnection* connection) :
-    processor_(processor),
-    input_(input),
-    output_(output),
-    connection_(connection),
-    serverEventHandler_(connection_->getServerEventHandler()),
-    connectionContext_(connection_->getConnectionContext()) {}
-
-  void run() {
-    try {
-      for (;;) {
-        if (serverEventHandler_) {
-          serverEventHandler_->processContext(connectionContext_, connection_->getTSocket());
-        }
-        if (!processor_->process(input_, output_, connectionContext_) ||
-            !input_->getTransport()->peek()) {
-          break;
-        }
-      }
-    } catch (const TTransportException& ttx) {
-      GlobalOutput.printf("TNonblockingServer: client died: %s", ttx.what());
-    } catch (const bad_alloc&) {
-      GlobalOutput("TNonblockingServer: caught bad_alloc exception.");
-      exit(1);
-    } catch (const std::exception& x) {
-      GlobalOutput.printf("TNonblockingServer: process() exception: %s: %s",
-                          typeid(x).name(), x.what());
-    } catch (...) {
-      GlobalOutput.printf(
-        "TNonblockingServer: unknown exception while processing.");
-    }
-
-    // Signal completion back to the libevent thread via a pipe
-    if (!connection_->notifyIOThread()) {
-      throw TException("TNonblockingServer::Task::run: failed write on notify pipe");
-    }
-  }
-
-  TConnection* getTConnection() {
-    return connection_;
-  }
-
- private:
-  boost::shared_ptr<TProcessor> processor_;
-  boost::shared_ptr<TProtocol> input_;
-  boost::shared_ptr<TProtocol> output_;
-  TConnection* connection_;
-  boost::shared_ptr<TServerEventHandler> serverEventHandler_;
-  void* connectionContext_;
-};
-
-void TNonblockingServer::TConnection::init(THRIFT_SOCKET socket,
-                                           TNonblockingIOThread* ioThread,
-                                           const sockaddr* addr,
-                                           socklen_t addrLen) {
-  tSocket_->setSocketFD(socket);
-  tSocket_->setCachedAddress(addr, addrLen);
-
-  ioThread_ = ioThread;
-  server_ = ioThread->getServer();
-  appState_ = APP_INIT;
-  eventFlags_ = 0;
-
-  readBufferPos_ = 0;
-  readWant_ = 0;
-
-  writeBuffer_ = NULL;
-  writeBufferSize_ = 0;
-  writeBufferPos_ = 0;
-  largestWriteBufferSize_ = 0;
-
-  socketState_ = SOCKET_RECV_FRAMING;
-  callsForResize_ = 0;
-
-  // get input/transports
-  factoryInputTransport_ = server_->getInputTransportFactory()->getTransport(
-                             inputTransport_);
-  factoryOutputTransport_ = server_->getOutputTransportFactory()->getTransport(
-                             outputTransport_);
-
-  // Create protocol
-  inputProtocol_ = server_->getInputProtocolFactory()->getProtocol(
-                     factoryInputTransport_);
-  outputProtocol_ = server_->getOutputProtocolFactory()->getProtocol(
-                     factoryOutputTransport_);
-
-  // Set up for any server event handler
-  serverEventHandler_ = server_->getEventHandler();
-  if (serverEventHandler_) {
-    connectionContext_ = serverEventHandler_->createContext(inputProtocol_,
-                                                            outputProtocol_);
-  } else {
-    connectionContext_ = NULL;
-  }
-
-  // Get the processor
-  processor_ = server_->getProcessor(inputProtocol_, outputProtocol_, tSocket_);
-}
-
-void TNonblockingServer::TConnection::workSocket() {
-  int got=0, left=0, sent=0;
-  uint32_t fetch = 0;
-
-  switch (socketState_) {
-  case SOCKET_RECV_FRAMING:
-    union {
-      uint8_t buf[sizeof(uint32_t)];
-      uint32_t size;
-    } framing;
-
-    // if we've already received some bytes we kept them here
-    framing.size = readWant_;
-    // determine size of this frame
-    try {
-      // Read from the socket
-      fetch = tSocket_->read(&framing.buf[readBufferPos_],
-                             uint32_t(sizeof(framing.size) - readBufferPos_));
-      if (fetch == 0) {
-        // Whenever we get here it means a remote disconnect
-        close();
-        return;
-      }
-      readBufferPos_ += fetch;
-    } catch (TTransportException& te) {
-      GlobalOutput.printf("TConnection::workSocket(): %s", te.what());
-      close();
-
-      return;
-    }
-
-    if (readBufferPos_ < sizeof(framing.size)) {
-      // more needed before frame size is known -- save what we have so far
-      readWant_ = framing.size;
-      return;
-    }
-
-    readWant_ = ntohl(framing.size);
-    if (readWant_ > server_->getMaxFrameSize()) {
-      // Don't allow giant frame sizes.  This prevents bad clients from
-      // causing us to try and allocate a giant buffer.
-      GlobalOutput.printf("TNonblockingServer: frame size too large "
-                          "(%" PRIu32 " > %" PRIu64 ") from client %s. "
-                          "Remote side not using TFramedTransport?",
-                          readWant_,
-                          (uint64_t)server_->getMaxFrameSize(),
-                          tSocket_->getSocketInfo().c_str());
-      close();
-      return;
-    }
-    // size known; now get the rest of the frame
-    transition();
-    return;
-
-  case SOCKET_RECV:
-    // It is an error to be in this state if we already have all the data
-    assert(readBufferPos_ < readWant_);
-
-    try {
-      // Read from the socket
-      fetch = readWant_ - readBufferPos_;
-      got = tSocket_->read(readBuffer_ + readBufferPos_, fetch);
-    }
-    catch (TTransportException& te) {
-      GlobalOutput.printf("TConnection::workSocket(): %s", te.what());
-      close();
-
-      return;
-    }
-
-    if (got > 0) {
-      // Move along in the buffer
-      readBufferPos_ += got;
-
-      // Check that we did not overdo it
-      assert(readBufferPos_ <= readWant_);
-
-      // We are done reading, move onto the next state
-      if (readBufferPos_ == readWant_) {
-        transition();
-      }
-      return;
-    }
-
-    // Whenever we get down here it means a remote disconnect
-    close();
-
-    return;
-
-  case SOCKET_SEND:
-    // Should never have position past size
-    assert(writeBufferPos_ <= writeBufferSize_);
-
-    // If there is no data to send, then let us move on
-    if (writeBufferPos_ == writeBufferSize_) {
-      GlobalOutput("WARNING: Send state with no data to send\n");
-      transition();
-      return;
-    }
-
-    try {
-      left = writeBufferSize_ - writeBufferPos_;
-      sent = tSocket_->write_partial(writeBuffer_ + writeBufferPos_, left);
-    }
-    catch (TTransportException& te) {
-      GlobalOutput.printf("TConnection::workSocket(): %s ", te.what());
-      close();
-      return;
-    }
-
-    writeBufferPos_ += sent;
-
-    // Did we overdo it?
-    assert(writeBufferPos_ <= writeBufferSize_);
-
-    // We are done!
-    if (writeBufferPos_ == writeBufferSize_) {
-      transition();
-    }
-
-    return;
-
-  default:
-    GlobalOutput.printf("Unexpected Socket State %d", socketState_);
-    assert(0);
-  }
-}
-
-/**
- * This is called when the application transitions from one state into
- * another. This means that it has finished writing the data that it needed
- * to, or finished receiving the data that it needed to.
- */
-void TNonblockingServer::TConnection::transition() {
-  // ensure this connection is active right now
-  assert(ioThread_);
-  assert(server_);
-
-  // Switch upon the state that we are currently in and move to a new state
-  switch (appState_) {
-
-  case APP_READ_REQUEST:
-    // We are done reading the request, package the read buffer into transport
-    // and get back some data from the dispatch function
-    inputTransport_->resetBuffer(readBuffer_, readBufferPos_);
-    outputTransport_->resetBuffer();
-    // Prepend four bytes of blank space to the buffer so we can
-    // write the frame size there later.
-    outputTransport_->getWritePtr(4);
-    outputTransport_->wroteBytes(4);
-
-    server_->incrementActiveProcessors();
-
-    if (server_->isThreadPoolProcessing()) {
-      // We are setting up a Task to do this work and we will wait on it
-
-      // Create task and dispatch to the thread manager
-      boost::shared_ptr<Runnable> task =
-        boost::shared_ptr<Runnable>(new Task(processor_,
-                                             inputProtocol_,
-                                             outputProtocol_,
-                                             this));
-      // The application is now waiting on the task to finish
-      appState_ = APP_WAIT_TASK;
-
-        try {
-          server_->addTask(task);
-        } catch (IllegalStateException & ise) {
-          // The ThreadManager is not ready to handle any more tasks (it's probably shutting down).
-          GlobalOutput.printf("IllegalStateException: Server::process() %s", ise.what());
-          close();
-        }
-
-      // Set this connection idle so that libevent doesn't process more
-      // data on it while we're still waiting for the threadmanager to
-      // finish this task
-      setIdle();
-      return;
-    } else {
-      try {
-        if (serverEventHandler_) {
-          serverEventHandler_->processContext(connectionContext_,
-                                              getTSocket());
-        }
-        // Invoke the processor
-        processor_->process(inputProtocol_, outputProtocol_,
-                            connectionContext_);
-      } catch (const TTransportException &ttx) {
-        GlobalOutput.printf("TNonblockingServer transport error in "
-                            "process(): %s", ttx.what());
-        server_->decrementActiveProcessors();
-        close();
-        return;
-      } catch (const std::exception &x) {
-        GlobalOutput.printf("Server::process() uncaught exception: %s: %s",
-                            typeid(x).name(), x.what());
-        server_->decrementActiveProcessors();
-        close();
-        return;
-      } catch (...) {
-        GlobalOutput.printf("Server::process() unknown exception");
-        server_->decrementActiveProcessors();
-        close();
-        return;
-      }
-    }
-
-    // Intentionally fall through here, the call to process has written into
-    // the writeBuffer_
-
-  case APP_WAIT_TASK:
-    // We have now finished processing a task and the result has been written
-    // into the outputTransport_, so we grab its contents and place them into
-    // the writeBuffer_ for actual writing by the libevent thread
-
-    server_->decrementActiveProcessors();
-    // Get the result of the operation
-    outputTransport_->getBuffer(&writeBuffer_, &writeBufferSize_);
-
-    // If the function call generated return data, then move into the send
-    // state and get going
-    // 4 bytes were reserved for frame size
-    if (writeBufferSize_ > 4) {
-
-      // Move into write state
-      writeBufferPos_ = 0;
-      socketState_ = SOCKET_SEND;
-
-      // Put the frame size into the write buffer
-      int32_t frameSize = (int32_t)htonl(writeBufferSize_ - 4);
-      memcpy(writeBuffer_, &frameSize, 4);
-
-      // Socket into write mode
-      appState_ = APP_SEND_RESULT;
-      setWrite();
-
-      // Try to work the socket immediately
-      // workSocket();
-
-      return;
-    }
-
-    // In this case, the request was oneway and we should fall through
-    // right back into the read frame header state
-    goto LABEL_APP_INIT;
-
-  case APP_SEND_RESULT:
-    // it's now safe to perform buffer size housekeeping.
-    if (writeBufferSize_ > largestWriteBufferSize_) {
-      largestWriteBufferSize_ = writeBufferSize_;
-    }
-    if (server_->getResizeBufferEveryN() > 0
-        && ++callsForResize_ >= server_->getResizeBufferEveryN()) {
-      checkIdleBufferMemLimit(server_->getIdleReadBufferLimit(),
-                              server_->getIdleWriteBufferLimit());
-      callsForResize_ = 0;
-    }
-
-    // N.B.: We also intentionally fall through here into the INIT state!
-
-  LABEL_APP_INIT:
-  case APP_INIT:
-
-    // Clear write buffer variables
-    writeBuffer_ = NULL;
-    writeBufferPos_ = 0;
-    writeBufferSize_ = 0;
-
-    // Into read4 state we go
-    socketState_ = SOCKET_RECV_FRAMING;
-    appState_ = APP_READ_FRAME_SIZE;
-
-    readBufferPos_ = 0;
-
-    // Register read event
-    setRead();
-
-    // Try to work the socket right away
-    // workSocket();
-
-    return;
-
-  case APP_READ_FRAME_SIZE:
-    // We just read the request length
-    // Double the buffer size until it is big enough
-    if (readWant_ > readBufferSize_) {
-      if (readBufferSize_ == 0) {
-        readBufferSize_ = 1;
-      }
-      uint32_t newSize = readBufferSize_;
-      while (readWant_ > newSize) {
-        newSize *= 2;
-      }
-
-      uint8_t* newBuffer = (uint8_t*)std::realloc(readBuffer_, newSize);
-      if (newBuffer == NULL) {
-        // nothing else to be done...
-        throw std::bad_alloc();
-      }
-      readBuffer_ = newBuffer;
-      readBufferSize_ = newSize;
-    }
-
-    readBufferPos_= 0;
-
-    // Move into read request state
-    socketState_ = SOCKET_RECV;
-    appState_ = APP_READ_REQUEST;
-
-    // Work the socket right away
-    // workSocket();
-
-    return;
-
-  case APP_CLOSE_CONNECTION:
-    server_->decrementActiveProcessors();
-    close();
-    return;
-
-  default:
-    GlobalOutput.printf("Unexpected Application State %d", appState_);
-    assert(0);
-  }
-}
-
-void TNonblockingServer::TConnection::setFlags(short eventFlags) {
-  // Catch the do nothing case
-  if (eventFlags_ == eventFlags) {
-    return;
-  }
-
-  // Delete a previously existing event
-  if (eventFlags_ != 0) {
-    if (event_del(&event_) == -1) {
-      GlobalOutput("TConnection::setFlags event_del");
-      return;
-    }
-  }
-
-  // Update in memory structure
-  eventFlags_ = eventFlags;
-
-  // Do not call event_set if there are no flags
-  if (!eventFlags_) {
-    return;
-  }
-
-  /*
-   * event_set:
-   *
-   * Prepares the event structure &event to be used in future calls to
-   * event_add() and event_del().  The event will be prepared to call the
-   * eventHandler using the 'sock' file descriptor to monitor events.
-   *
-   * The events can be either EV_READ, EV_WRITE, or both, indicating
-   * that an application can read or write from the file respectively without
-   * blocking.
-   *
-   * The eventHandler will be called with the file descriptor that triggered
-   * the event and the type of event which will be one of: EV_TIMEOUT,
-   * EV_SIGNAL, EV_READ, EV_WRITE.
-   *
-   * The additional flag EV_PERSIST makes an event_add() persistent until
-   * event_del() has been called.
-   *
-   * Once initialized, the &event struct can be used repeatedly with
-   * event_add() and event_del() and does not need to be reinitialized unless
-   * the eventHandler and/or the argument to it are to be changed.  However,
-   * when an ev structure has been added to libevent using event_add() the
-   * structure must persist until the event occurs (assuming EV_PERSIST
-   * is not set) or is removed using event_del().  You may not reuse the same
-   * ev structure for multiple monitored descriptors; each descriptor needs
-   * its own ev.
-   */
-  event_set(&event_, tSocket_->getSocketFD(), eventFlags_,
-            TConnection::eventHandler, this);
-  event_base_set(ioThread_->getEventBase(), &event_);
-
-  // Add the event
-  if (event_add(&event_, 0) == -1) {
-    GlobalOutput("TConnection::setFlags(): could not event_add");
-  }
-}
-
-/**
- * Closes a connection
- */
-void TNonblockingServer::TConnection::close() {
-  // Delete the registered libevent
-  if (event_del(&event_) == -1) {
-    GlobalOutput.perror("TConnection::close() event_del", THRIFT_GET_SOCKET_ERROR);
-  }
-
-  if (serverEventHandler_) {
-    serverEventHandler_->deleteContext(connectionContext_, inputProtocol_, outputProtocol_);
-  }
-  ioThread_ = NULL;
-
-  // Close the socket
-  tSocket_->close();
-
-  // close any factory produced transports
-  factoryInputTransport_->close();
-  factoryOutputTransport_->close();
-
-  // Give this object back to the server that owns it
-  server_->returnConnection(this);
-}
-
-void TNonblockingServer::TConnection::checkIdleBufferMemLimit(
-    size_t readLimit,
-    size_t writeLimit) {
-  if (readLimit > 0 && readBufferSize_ > readLimit) {
-    free(readBuffer_);
-    readBuffer_ = NULL;
-    readBufferSize_ = 0;
-  }
-
-  if (writeLimit > 0 && largestWriteBufferSize_ > writeLimit) {
-    // just start over
-    outputTransport_->resetBuffer(static_cast<uint32_t>(server_->getWriteBufferDefaultSize()));
-    largestWriteBufferSize_ = 0;
-  }
-}
-
-TNonblockingServer::~TNonblockingServer() {
-  // Close any active connections (moves them to the idle connection stack)
-  while (activeConnections_.size()) {
-	  activeConnections_.front()->close();
-  }
-  // Clean up unused TConnection objects in connectionStack_
-  while (!connectionStack_.empty()) {
-    TConnection* connection = connectionStack_.top();
-    connectionStack_.pop();
-    delete connection;
-  }
-  // The TNonblockingIOThread objects have shared_ptrs to the Thread
-  // objects and the Thread objects have shared_ptrs to the TNonblockingIOThread
-  // objects (as runnable) so these objects will never deallocate without help.
-  while (!ioThreads_.empty()) {
-	  boost::shared_ptr<TNonblockingIOThread> iot = ioThreads_.back();
-	  ioThreads_.pop_back();
-	  iot->setThread(boost::shared_ptr<Thread>());
-  }
-}
-
-/**
- * Creates a new connection either by reusing an object off the stack or
- * by allocating a new one entirely
- */
-TNonblockingServer::TConnection* TNonblockingServer::createConnection(
-    THRIFT_SOCKET socket, const sockaddr* addr, socklen_t addrLen) {
-  // Check the stack
-  Guard g(connMutex_);
-
-  // pick an IO thread to handle this connection -- currently round robin
-  assert(nextIOThread_ < ioThreads_.size());
-  int selectedThreadIdx = nextIOThread_;
-  nextIOThread_ = (nextIOThread_ + 1) % ioThreads_.size();
-
-  TNonblockingIOThread* ioThread = ioThreads_[selectedThreadIdx].get();
-
-  // Check the connection stack to see if we can re-use
-  TConnection* result = NULL;
-  if (connectionStack_.empty()) {
-    result = new TConnection(socket, ioThread, addr, addrLen);
-    ++numTConnections_;
-  } else {
-    result = connectionStack_.top();
-    connectionStack_.pop();
-    result->init(socket, ioThread, addr, addrLen);
-  }
-  activeConnections_.push_back(result);
-  return result;
-}
-
-/**
- * Returns a connection to the stack
- */
-void TNonblockingServer::returnConnection(TConnection* connection) {
-  Guard g(connMutex_);
-
-  activeConnections_.erase(std::remove(activeConnections_.begin(), activeConnections_.end(), connection), activeConnections_.end());
-
-  if (connectionStackLimit_ &&
-      (connectionStack_.size() >= connectionStackLimit_)) {
-    delete connection;
-    --numTConnections_;
-  } else {
-    connection->checkIdleBufferMemLimit(idleReadBufferLimit_, idleWriteBufferLimit_);
-    connectionStack_.push(connection);
-  }
-}
-
-/**
- * Server socket had something happen.  We accept all waiting client
- * connections on fd and assign TConnection objects to handle those requests.
- */
-void TNonblockingServer::handleEvent(THRIFT_SOCKET fd, short which) {
-  (void) which;
-  // Make sure that libevent didn't mess up the socket handles
-  assert(fd == serverSocket_);
-
-  // Server socket accepted a new connection
-  socklen_t addrLen;
-  sockaddr_storage addrStorage;
-  sockaddr* addrp = (sockaddr*)&addrStorage;
-  addrLen = sizeof(addrStorage);
-
-  // Going to accept a new client socket
-  THRIFT_SOCKET clientSocket;
-
-  // Accept as many new clients as possible, even though libevent signaled only
-  // one, this helps us to avoid having to go back into the libevent engine so
-  // many times
-  while ((clientSocket = ::accept(fd, addrp, &addrLen)) != -1) {
-    // If we're overloaded, take action here
-    if (overloadAction_ != T_OVERLOAD_NO_ACTION && serverOverloaded()) {
-      Guard g(connMutex_);
-      nConnectionsDropped_++;
-      nTotalConnectionsDropped_++;
-      if (overloadAction_ == T_OVERLOAD_CLOSE_ON_ACCEPT) {
-        ::THRIFT_CLOSESOCKET(clientSocket);
-        return;
-      } else if (overloadAction_ == T_OVERLOAD_DRAIN_TASK_QUEUE) {
-        if (!drainPendingTask()) {
-          // Nothing left to discard, so we drop connection instead.
-          ::THRIFT_CLOSESOCKET(clientSocket);
-          return;
-        }
-      }
-    }
-
-    // Explicitly set this socket to NONBLOCK mode
-    int flags;
-    if ((flags = THRIFT_FCNTL(clientSocket, THRIFT_F_GETFL, 0)) < 0 ||
-        THRIFT_FCNTL(clientSocket, THRIFT_F_SETFL, flags | THRIFT_O_NONBLOCK) < 0) {
-      GlobalOutput.perror("thriftServerEventHandler: set THRIFT_O_NONBLOCK (THRIFT_FCNTL) ", THRIFT_GET_SOCKET_ERROR);
-      ::THRIFT_CLOSESOCKET(clientSocket);
-      return;
-    }
-
-    // Create a new TConnection for this client socket.
-    TConnection* clientConnection =
-      createConnection(clientSocket, addrp, addrLen);
-
-    // Fail fast if we could not create a TConnection object
-    if (clientConnection == NULL) {
-      GlobalOutput.printf("thriftServerEventHandler: failed TConnection factory");
-      ::THRIFT_CLOSESOCKET(clientSocket);
-      return;
-    }
-
-    /*
-     * Either notify the ioThread that is assigned this connection to
-     * start processing, or if it is us, we'll just ask this
-     * connection to do its initial state change here.
-     *
-     * (We need to avoid writing to our own notification pipe, to
-     * avoid possible deadlocks if the pipe is full.)
-     *
-     * The IO thread #0 is the only one that handles these listen
-     * events, so unless the connection has been assigned to thread #0
-     * we know it's not on our thread.
-     */
-    if (clientConnection->getIOThreadNumber() == 0) {
-      clientConnection->transition();
-    } else {
-      clientConnection->notifyIOThread();
-    }
-
-    // addrLen is written by the accept() call, so needs to be set before the next call.
-    addrLen = sizeof(addrStorage);
-  }
-
-
-  // Done looping accept, now we have to make sure the error is due to
-  // blocking. Any other error is a problem
-  if (THRIFT_GET_SOCKET_ERROR != THRIFT_EAGAIN && THRIFT_GET_SOCKET_ERROR != THRIFT_EWOULDBLOCK) {
-    GlobalOutput.perror("thriftServerEventHandler: accept() ", THRIFT_GET_SOCKET_ERROR);
-  }
-}
-
-/**
- * Creates a socket to listen on and binds it to the local port.
- */
-void TNonblockingServer::createAndListenOnSocket() {
-  THRIFT_SOCKET s;
-
-  struct addrinfo hints, *res, *res0;
-  int error;
-
-  char port[sizeof("65536") + 1];
-  memset(&hints, 0, sizeof(hints));
-  hints.ai_family = PF_UNSPEC;
-  hints.ai_socktype = SOCK_STREAM;
-  hints.ai_flags = AI_PASSIVE | AI_ADDRCONFIG;
-  sprintf(port, "%d", port_);
-
-  // Wildcard address
-  error = getaddrinfo(NULL, port, &hints, &res0);
-  if (error) {
-    throw TException("TNonblockingServer::serve() getaddrinfo " +
-                     string(THRIFT_GAI_STRERROR(error)));
-  }
-
-  // Pick the ipv6 address first since ipv4 addresses can be mapped
-  // into ipv6 space.
-  for (res = res0; res; res = res->ai_next) {
-    if (res->ai_family == AF_INET6 || res->ai_next == NULL)
-      break;
-  }
-
-  // Create the server socket
-  s = socket(res->ai_family, res->ai_socktype, res->ai_protocol);
-  if (s == -1) {
-    freeaddrinfo(res0);
-    throw TException("TNonblockingServer::serve() socket() -1");
-  }
-
-  #ifdef IPV6_V6ONLY
-  if (res->ai_family == AF_INET6) {
-    int zero = 0;
-    if (-1 == setsockopt(s, IPPROTO_IPV6, IPV6_V6ONLY, const_cast_sockopt(&zero), sizeof(zero))) {
-      GlobalOutput("TServerSocket::listen() IPV6_V6ONLY");
-    }
-  }
-  #endif // #ifdef IPV6_V6ONLY
-
-
-  int one = 1;
-
-  // Set THRIFT_NO_SOCKET_CACHING to avoid 2MSL delay on server restart
-  setsockopt(s, SOL_SOCKET, THRIFT_NO_SOCKET_CACHING, const_cast_sockopt(&one), sizeof(one));
-
-  if (::bind(s, res->ai_addr, static_cast<int>(res->ai_addrlen)) == -1) {
-    ::THRIFT_CLOSESOCKET(s);
-    freeaddrinfo(res0);
-    throw TTransportException(TTransportException::NOT_OPEN,
-                              "TNonblockingServer::serve() bind",
-                              THRIFT_GET_SOCKET_ERROR);
-  }
-
-  // Done with the addr info
-  freeaddrinfo(res0);
-
-  // Set up this file descriptor for listening
-  listenSocket(s);
-}
-
-/**
- * Takes a socket created by listenSocket() and sets various options on it
- * to prepare for use in the server.
- */
-void TNonblockingServer::listenSocket(THRIFT_SOCKET s) {
-  // Set socket to nonblocking mode
-  int flags;
-  if ((flags = THRIFT_FCNTL(s, THRIFT_F_GETFL, 0)) < 0 ||
-      THRIFT_FCNTL(s, THRIFT_F_SETFL, flags | THRIFT_O_NONBLOCK) < 0) {
-    ::THRIFT_CLOSESOCKET(s);
-    throw TException("TNonblockingServer::serve() THRIFT_O_NONBLOCK");
-  }
-
-  int one = 1;
-  struct linger ling = {0, 0};
-
-  // Keepalive to ensure full result flushing
-  setsockopt(s, SOL_SOCKET, SO_KEEPALIVE, const_cast_sockopt(&one), sizeof(one));
-
-  // Turn linger off to avoid hung sockets
-  setsockopt(s, SOL_SOCKET, SO_LINGER, const_cast_sockopt(&ling), sizeof(ling));
-
-  // Set TCP nodelay if available, MAC OS X Hack
-  // See http://lists.danga.com/pipermail/memcached/2005-March/001240.html
-  #ifndef TCP_NOPUSH
-  setsockopt(s, IPPROTO_TCP, TCP_NODELAY, const_cast_sockopt(&one), sizeof(one));
-  #endif
-
-  #ifdef TCP_LOW_MIN_RTO
-  if (TSocket::getUseLowMinRto()) {
-    setsockopt(s, IPPROTO_TCP, TCP_LOW_MIN_RTO, const_cast_sockopt(&one), sizeof(one));
-  }
-  #endif
-
-  if (listen(s, LISTEN_BACKLOG) == -1) {
-    ::THRIFT_CLOSESOCKET(s);
-    throw TException("TNonblockingServer::serve() listen");
-  }
-
-  // Cool, this socket is good to go, set it as the serverSocket_
-  serverSocket_ = s;
-}
-
-void TNonblockingServer::setThreadManager(boost::shared_ptr<ThreadManager> threadManager) {
-  threadManager_ = threadManager;
-  if (threadManager) {
-    threadManager->setExpireCallback(apache::thrift::stdcxx::bind(&TNonblockingServer::expireClose, this, apache::thrift::stdcxx::placeholders::_1));
-    threadPoolProcessing_ = true;
-  } else {
-    threadPoolProcessing_ = false;
-  }
-}
-
-bool  TNonblockingServer::serverOverloaded() {
-  size_t activeConnections = numTConnections_ - connectionStack_.size();
-  if (numActiveProcessors_ > maxActiveProcessors_ ||
-      activeConnections > maxConnections_) {
-    if (!overloaded_) {
-       GlobalOutput.printf("TNonblockingServer: overload condition begun.");
-      overloaded_ = true;
-    }
-  } else {
-    if (overloaded_ &&
-        (numActiveProcessors_ <= overloadHysteresis_ * maxActiveProcessors_) &&
-        (activeConnections <= overloadHysteresis_ * maxConnections_)) {
-      GlobalOutput.printf("TNonblockingServer: overload ended; "
-                          "%u dropped (%llu total)",
-                          nConnectionsDropped_, nTotalConnectionsDropped_);
-      nConnectionsDropped_ = 0;
-      overloaded_ = false;
-    }
-  }
-
-  return overloaded_;
-}
-
-bool TNonblockingServer::drainPendingTask() {
-  if (threadManager_) {
-    boost::shared_ptr<Runnable> task = threadManager_->removeNextPending();
-    if (task) {
-      TConnection* connection =
-        static_cast<TConnection::Task*>(task.get())->getTConnection();
-      assert(connection && connection->getServer()
-             && connection->getState() == APP_WAIT_TASK);
-      connection->forceClose();
-      return true;
-    }
-  }
-  return false;
-}
-
-void TNonblockingServer::expireClose(boost::shared_ptr<Runnable> task) {
-  TConnection* connection =
-    static_cast<TConnection::Task*>(task.get())->getTConnection();
-  assert(connection && connection->getServer() &&
-         connection->getState() == APP_WAIT_TASK);
-  connection->forceClose();
-}
-
-void TNonblockingServer::stop() {
-  // Breaks the event loop in all threads so that they end ASAP.
-  for (uint32_t i = 0; i < ioThreads_.size(); ++i) {
-    ioThreads_[i]->stop();
-  }
-}
-
-void TNonblockingServer::registerEvents(event_base* user_event_base) {
-  userEventBase_ = user_event_base;
-
-  // init listen socket
-  if (serverSocket_ == THRIFT_INVALID_SOCKET)
-    createAndListenOnSocket();
-
-  // set up the IO threads
-  assert(ioThreads_.empty());
-  if (!numIOThreads_) {
-    numIOThreads_ = DEFAULT_IO_THREADS;
-  }
-
-  for (uint32_t id = 0; id < numIOThreads_; ++id) {
-    // the first IO thread also does the listening on server socket
-    THRIFT_SOCKET listenFd = (id == 0 ? serverSocket_ : THRIFT_INVALID_SOCKET);
-
-    shared_ptr<TNonblockingIOThread> thread(
-      new TNonblockingIOThread(this, id, listenFd, useHighPriorityIOThreads_));
-    ioThreads_.push_back(thread);
-  }
-
-  // Notify handler of the preServe event
-  if (eventHandler_) {
-    eventHandler_->preServe();
-  }
-
-  // Start all of our helper IO threads. Note that the threads run forever,
-  // only terminating if stop() is called.
-  assert(ioThreads_.size() == numIOThreads_);
-  assert(ioThreads_.size() > 0);
-
-  GlobalOutput.printf("TNonblockingServer: Serving on port %d, %d io threads.",
-               port_, ioThreads_.size());
-
-  // Launch all the secondary IO threads in separate threads
-  if (ioThreads_.size() > 1) {
-    ioThreadFactory_.reset(new PlatformThreadFactory(
-#if !defined(USE_BOOST_THREAD) && !defined(USE_STD_THREAD)
-      PlatformThreadFactory::OTHER,  // scheduler
-      PlatformThreadFactory::NORMAL, // priority
-      1,                          // stack size (MB)
-#endif
-      false                       // detached
-    ));
-
-    assert(ioThreadFactory_.get());
-
-    // intentionally starting at thread 1, not 0
-    for (uint32_t i = 1; i < ioThreads_.size(); ++i) {
-      shared_ptr<Thread> thread = ioThreadFactory_->newThread(ioThreads_[i]);
-      ioThreads_[i]->setThread(thread);
-      thread->start();
-    }
-  }
-
-  // Register the events for the primary (listener) IO thread
-  ioThreads_[0]->registerEvents();
-}
-
-/**
- * Main workhorse function, starts up the server listening on a port and
- * loops over the libevent handler.
- */
-void TNonblockingServer::serve() {
-
-  registerEvents(NULL);
-
-  // Run the primary (listener) IO thread loop in our main thread; this will
-  // only return when the server is shutting down.
-  ioThreads_[0]->run();
-
-  // Ensure all threads are finished before exiting serve()
-  for (uint32_t i = 0; i < ioThreads_.size(); ++i) {
-    ioThreads_[i]->join();
-    GlobalOutput.printf("TNonblocking: join done for IO thread #%d", i);
-  }
-}
-
-TNonblockingIOThread::TNonblockingIOThread(TNonblockingServer* server,
-                                           int number,
-                                           THRIFT_SOCKET listenSocket,
-                                           bool useHighPriority)
-      : server_(server)
-      , number_(number)
-      , listenSocket_(listenSocket)
-      , useHighPriority_(useHighPriority)
-      , eventBase_(NULL)
-      , ownEventBase_(false) {
-  notificationPipeFDs_[0] = -1;
-  notificationPipeFDs_[1] = -1;
-}
-
-TNonblockingIOThread::~TNonblockingIOThread() {
-  // make sure our associated thread is fully finished
-  join();
-
-  if (eventBase_ && ownEventBase_) {
-    event_base_free(eventBase_);
-    ownEventBase_ = false;
-  }
-
-  if (listenSocket_ >= 0) {
-    if (0 != ::THRIFT_CLOSESOCKET(listenSocket_)) {
-      GlobalOutput.perror("TNonblockingIOThread listenSocket_ close(): ",
-                          THRIFT_GET_SOCKET_ERROR);
-    }
-    listenSocket_ = THRIFT_INVALID_SOCKET;
-  }
-
-  for (int i = 0; i < 2; ++i) {
-    if (notificationPipeFDs_[i] >= 0) {
-      if (0 != ::THRIFT_CLOSESOCKET(notificationPipeFDs_[i])) {
-        GlobalOutput.perror("TNonblockingIOThread notificationPipe close(): ",
-                            THRIFT_GET_SOCKET_ERROR);
-      }
-      notificationPipeFDs_[i] = THRIFT_INVALID_SOCKET;
-    }
-  }
-}
-
-void TNonblockingIOThread::createNotificationPipe() {
-  if(evutil_socketpair(AF_LOCAL, SOCK_STREAM, 0, notificationPipeFDs_) == -1) {
-    GlobalOutput.perror("TNonblockingServer::createNotificationPipe ", EVUTIL_SOCKET_ERROR());
-    throw TException("can't create notification pipe");
-  }
-  if(evutil_make_socket_nonblocking(notificationPipeFDs_[0])<0 ||
-     evutil_make_socket_nonblocking(notificationPipeFDs_[1])<0) {
-    ::THRIFT_CLOSESOCKET(notificationPipeFDs_[0]);
-    ::THRIFT_CLOSESOCKET(notificationPipeFDs_[1]);
-    throw TException("TNonblockingServer::createNotificationPipe() THRIFT_O_NONBLOCK");
-  }
-  for (int i = 0; i < 2; ++i) {
-#if LIBEVENT_VERSION_NUMBER < 0x02000000
-    int flags;
-    if ((flags = THRIFT_FCNTL(notificationPipeFDs_[i], F_GETFD, 0)) < 0 ||
-        THRIFT_FCNTL(notificationPipeFDs_[i], F_SETFD, flags | FD_CLOEXEC) < 0) {
-#else
-    if (evutil_make_socket_closeonexec(notificationPipeFDs_[i]) < 0) {
-#endif
-      ::THRIFT_CLOSESOCKET(notificationPipeFDs_[0]);
-      ::THRIFT_CLOSESOCKET(notificationPipeFDs_[1]);
-      throw TException("TNonblockingServer::createNotificationPipe() "
-        "FD_CLOEXEC");
-    }
-  }
-}
-
-/**
- * Register the core libevent events onto the proper base.
- */
-void TNonblockingIOThread::registerEvents() {
-  threadId_ = Thread::get_current();
-
-  assert(eventBase_ == 0);
-  eventBase_ = getServer()->getUserEventBase();
-  if (eventBase_ == NULL) {
-    eventBase_ = event_base_new();
-    ownEventBase_ = true;
-  }
-
-  // Print some libevent stats
-  if (number_ == 0) {
-    GlobalOutput.printf("TNonblockingServer: using libevent %s method %s",
-            event_get_version(),
-            event_base_get_method(eventBase_));
-  }
-
-  if (listenSocket_ >= 0) {
-    // Register the server event
-    event_set(&serverEvent_,
-              listenSocket_,
-              EV_READ | EV_PERSIST,
-              TNonblockingIOThread::listenHandler,
-              server_);
-    event_base_set(eventBase_, &serverEvent_);
-
-    // Add the event and start up the server
-    if (-1 == event_add(&serverEvent_, 0)) {
-      throw TException("TNonblockingServer::serve(): "
-                       "event_add() failed on server listen event");
-    }
-    GlobalOutput.printf("TNonblocking: IO thread #%d registered for listen.",
-                        number_);
-  }
-
-  createNotificationPipe();
-
-  // Create an event to be notified when a task finishes
-  event_set(&notificationEvent_,
-            getNotificationRecvFD(),
-            EV_READ | EV_PERSIST,
-            TNonblockingIOThread::notifyHandler,
-            this);
-
-  // Attach to the base
-  event_base_set(eventBase_, &notificationEvent_);
-
-  // Add the event and start up the server
-  if (-1 == event_add(&notificationEvent_, 0)) {
-    throw TException("TNonblockingServer::serve(): "
-                     "event_add() failed on task-done notification event");
-  }
-  GlobalOutput.printf("TNonblocking: IO thread #%d registered for notify.",
-                      number_);
-}
-
-bool TNonblockingIOThread::notify(TNonblockingServer::TConnection* conn) {
-  THRIFT_SOCKET fd = getNotificationSendFD();
-  if (fd < 0) {
-    return false;
-  }
-
-  const int kSize = sizeof(conn);
-  if (send(fd, const_cast_sockopt(&conn), kSize, 0) != kSize) {
-    return false;
-  }
-
-  return true;
-}
-
-/* static */
-void TNonblockingIOThread::notifyHandler(evutil_socket_t fd, short which, void* v) {
-  TNonblockingIOThread* ioThread = (TNonblockingIOThread*) v;
-  assert(ioThread);
-  (void)which;
-
-  while (true) {
-    TNonblockingServer::TConnection* connection = 0;
-    const int kSize = sizeof(connection);
-    int nBytes = recv(fd, cast_sockopt(&connection), kSize, 0);
-    if (nBytes == kSize) {
-      if (connection == NULL) {
-        // this is the command to stop our thread, exit the handler!
-        return;
-      }
-      connection->transition();
-    } else if (nBytes > 0) {
-      // throw away these bytes and hope that next time we get a solid read
-      GlobalOutput.printf("notifyHandler: Bad read of %d bytes, wanted %d",
-                          nBytes, kSize);
-      ioThread->breakLoop(true);
-      return;
-    } else if (nBytes == 0) {
-      GlobalOutput.printf("notifyHandler: Notify socket closed!");
-      // exit the loop
-      break;
-    } else { // nBytes < 0
-      if (THRIFT_GET_SOCKET_ERROR != THRIFT_EWOULDBLOCK && THRIFT_GET_SOCKET_ERROR != THRIFT_EAGAIN) {
-          GlobalOutput.perror(
-            "TNonblocking: notifyHandler read() failed: ", THRIFT_GET_SOCKET_ERROR);
-          ioThread->breakLoop(true);
-          return;
-      }
-      // exit the loop
-      break;
-    }
-  }
-}
-
-void TNonblockingIOThread::breakLoop(bool error) {
-  if (error) {
-    GlobalOutput.printf(
-      "TNonblockingServer: IO thread #%d exiting with error.", number_);
-    // TODO: figure out something better to do here, but for now kill the
-    // whole process.
-    GlobalOutput.printf("TNonblockingServer: aborting process.");
-    ::abort();
-  }
-
-  // sets a flag so that the loop exits on the next event
-  event_base_loopbreak(eventBase_);
-
-  // event_base_loopbreak() only causes the loop to exit the next time
-  // it wakes up.  We need to force it to wake up, in case there are
-  // no real events it needs to process.
-  //
-  // If we're running in the same thread, we can't use the notify(0)
-  // mechanism to stop the thread, but happily if we're running in the
-  // same thread, this means the thread can't be blocking in the event
-  // loop either.
-  if (!Thread::is_current(threadId_)) {
-    notify(NULL);
-  }
-}
-
-void TNonblockingIOThread::setCurrentThreadHighPriority(bool value) {
-#ifdef HAVE_SCHED_H
-  // Start out with a standard, low-priority setup for the sched params.
-  struct sched_param sp;
-  bzero((void*) &sp, sizeof(sp));
-  int policy = SCHED_OTHER;
-
-  // If desired, set up high-priority sched params structure.
-  if (value) {
-    // FIFO scheduler, ranked above default SCHED_OTHER queue
-    policy = SCHED_FIFO;
-    // The priority only compares us to other SCHED_FIFO threads, so we
-    // just pick a random priority halfway between min & max.
-    const int priority = (sched_get_priority_max(policy) +
-                          sched_get_priority_min(policy)) / 2;
-
-    sp.sched_priority = priority;
-  }
-
-  // Actually set the sched params for the current thread.
-  if (0 == pthread_setschedparam(pthread_self(), policy, &sp)) {
-    GlobalOutput.printf(
-      "TNonblocking: IO Thread #%d using high-priority scheduler!", number_);
-  } else {
-    GlobalOutput.perror("TNonblocking: pthread_setschedparam(): ", THRIFT_GET_SOCKET_ERROR);
-  }
-#else
-  THRIFT_UNUSED_VARIABLE(value);
-#endif
-}
-
-void TNonblockingIOThread::run() {
-  if (eventBase_ == NULL)
-    registerEvents();
-
-  GlobalOutput.printf("TNonblockingServer: IO thread #%d entering loop...",
-                      number_);
-
-  if (useHighPriority_) {
-    setCurrentThreadHighPriority(true);
-  }
-
-  // Run libevent engine, never returns, invokes calls to eventHandler
-  event_base_loop(eventBase_, 0);
-
-  if (useHighPriority_) {
-    setCurrentThreadHighPriority(false);
-  }
-
-  // cleans up our registered events
-  cleanupEvents();
-
-  GlobalOutput.printf("TNonblockingServer: IO thread #%d run() done!",
-    number_);
-}
-
-void TNonblockingIOThread::cleanupEvents() {
-  // stop the listen socket, if any
-  if (listenSocket_ >= 0) {
-    if (event_del(&serverEvent_) == -1) {
-      GlobalOutput.perror("TNonblockingIOThread::stop() event_del: ", THRIFT_GET_SOCKET_ERROR);
-    }
-  }
-
-  event_del(&notificationEvent_);
-}
-
-
-void TNonblockingIOThread::stop() {
-  // This should cause the thread to fall out of its event loop ASAP.
-  breakLoop(false);
-}
-
-void TNonblockingIOThread::join() {
-  // If this was a thread created by a factory (not the thread that called
-  // serve()), we join() it to make sure we shut down fully.
-  if (thread_) {
-    try {
-      // Note that it is safe to both join() ourselves twice, as well as join
-      // the current thread as the pthread implementation checks for deadlock.
-      thread_->join();
-    } catch(...) {
-      // swallow everything
-    }
-  }
-}
-
-}}} // apache::thrift::server

http://git-wip-us.apache.org/repos/asf/airavata-php-gateway/blob/d55608f1/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/thrift/server/TNonblockingServer.h
----------------------------------------------------------------------
diff --git a/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/thrift/server/TNonblockingServer.h b/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/thrift/server/TNonblockingServer.h
deleted file mode 100644
index 532d4ae..0000000
--- a/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/thrift/server/TNonblockingServer.h
+++ /dev/null
@@ -1,944 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-#ifndef _THRIFT_SERVER_TNONBLOCKINGSERVER_H_
-#define _THRIFT_SERVER_TNONBLOCKINGSERVER_H_ 1
-
-#include <thrift/Thrift.h>
-#include <thrift/server/TServer.h>
-#include <thrift/transport/PlatformSocket.h>
-#include <thrift/transport/TBufferTransports.h>
-#include <thrift/transport/TSocket.h>
-#include <thrift/concurrency/ThreadManager.h>
-#include <climits>
-#include <thrift/concurrency/Thread.h>
-#include <thrift/concurrency/PlatformThreadFactory.h>
-#include <thrift/concurrency/Mutex.h>
-#include <stack>
-#include <vector>
-#include <string>
-#include <cstdlib>
-#ifdef HAVE_UNISTD_H
-#include <unistd.h>
-#endif
-#include <event.h>
-
-
-
-namespace apache { namespace thrift { namespace server {
-
-using apache::thrift::transport::TMemoryBuffer;
-using apache::thrift::transport::TSocket;
-using apache::thrift::protocol::TProtocol;
-using apache::thrift::concurrency::Runnable;
-using apache::thrift::concurrency::ThreadManager;
-using apache::thrift::concurrency::PlatformThreadFactory;
-using apache::thrift::concurrency::ThreadFactory;
-using apache::thrift::concurrency::Thread;
-using apache::thrift::concurrency::Mutex;
-using apache::thrift::concurrency::Guard;
-
-#ifdef LIBEVENT_VERSION_NUMBER
-#define LIBEVENT_VERSION_MAJOR (LIBEVENT_VERSION_NUMBER >> 24)
-#define LIBEVENT_VERSION_MINOR ((LIBEVENT_VERSION_NUMBER >> 16) & 0xFF)
-#define LIBEVENT_VERSION_REL ((LIBEVENT_VERSION_NUMBER >> 8) & 0xFF)
-#else
-// assume latest version 1 series
-#define LIBEVENT_VERSION_MAJOR 1
-#define LIBEVENT_VERSION_MINOR 14
-#define LIBEVENT_VERSION_REL 13
-#define LIBEVENT_VERSION_NUMBER ((LIBEVENT_VERSION_MAJOR << 24) | (LIBEVENT_VERSION_MINOR << 16) | (LIBEVENT_VERSION_REL << 8))
-#endif
-
-#if LIBEVENT_VERSION_NUMBER < 0x02000000
- typedef THRIFT_SOCKET evutil_socket_t;
-#endif
-
-#ifndef SOCKOPT_CAST_T
-#   ifndef _WIN32
-#       define SOCKOPT_CAST_T void
-#   else
-#       define SOCKOPT_CAST_T char
-#   endif // _WIN32
-#endif
-
-template<class T>
-inline const SOCKOPT_CAST_T* const_cast_sockopt(const T* v) {
-  return reinterpret_cast<const SOCKOPT_CAST_T*>(v);
-}
-
-template<class T>
-inline SOCKOPT_CAST_T* cast_sockopt(T* v) {
-  return reinterpret_cast<SOCKOPT_CAST_T*>(v);
-}
-
-/**
- * This is a non-blocking server in C++ for high performance that
- * operates a set of IO threads (by default only one). It assumes that
- * all incoming requests are framed with a 4 byte length indicator and
- * writes out responses using the same framing.
- *
- * It does not use the TServerTransport framework, but rather has socket
- * operations hardcoded for use with select.
- *
- */
-
-
-/// Overload condition actions.
-enum TOverloadAction {
-  T_OVERLOAD_NO_ACTION,        ///< Don't handle overload */
-  T_OVERLOAD_CLOSE_ON_ACCEPT,  ///< Drop new connections immediately */
-  T_OVERLOAD_DRAIN_TASK_QUEUE  ///< Drop some tasks from head of task queue */
-};
-
-class TNonblockingIOThread;
-
-class TNonblockingServer : public TServer {
- private:
-  class TConnection;
-
-  friend class TNonblockingIOThread;
- private:
-  /// Listen backlog
-  static const int LISTEN_BACKLOG = 1024;
-
-  /// Default limit on size of idle connection pool
-  static const size_t CONNECTION_STACK_LIMIT = 1024;
-
-  /// Default limit on frame size
-  static const int MAX_FRAME_SIZE = 256 * 1024 * 1024;
-
-  /// Default limit on total number of connected sockets
-  static const int MAX_CONNECTIONS = INT_MAX;
-
-  /// Default limit on connections in handler/task processing
-  static const int MAX_ACTIVE_PROCESSORS = INT_MAX;
-
-  /// Default size of write buffer
-  static const int WRITE_BUFFER_DEFAULT_SIZE = 1024;
-
-  /// Maximum size of read buffer allocated to idle connection (0 = unlimited)
-  static const int IDLE_READ_BUFFER_LIMIT = 1024;
-
-  /// Maximum size of write buffer allocated to idle connection (0 = unlimited)
-  static const int IDLE_WRITE_BUFFER_LIMIT = 1024;
-
-  /// # of calls before resizing oversized buffers (0 = check only on close)
-  static const int RESIZE_BUFFER_EVERY_N = 512;
-
-  /// # of IO threads to use by default
-  static const int DEFAULT_IO_THREADS = 1;
-
-  /// # of IO threads this server will use
-  size_t numIOThreads_;
-
-  /// Whether to set high scheduling priority for IO threads
-  bool useHighPriorityIOThreads_;
-
-  /// Server socket file descriptor
-  THRIFT_SOCKET serverSocket_;
-
-  /// Port server runs on
-  int port_;
-
-  /// The optional user-provided event-base (for single-thread servers)
-  event_base* userEventBase_;
-
-  /// For processing via thread pool, may be NULL
-  boost::shared_ptr<ThreadManager> threadManager_;
-
-  /// Is thread pool processing?
-  bool threadPoolProcessing_;
-
-  // Factory to create the IO threads
-  boost::shared_ptr<PlatformThreadFactory> ioThreadFactory_;
-
-  // Vector of IOThread objects that will handle our IO
-  std::vector<boost::shared_ptr<TNonblockingIOThread> > ioThreads_;
-
-  // Index of next IO Thread to be used (for round-robin)
-  uint32_t nextIOThread_;
-
-  // Synchronizes access to connection stack and similar data
-  Mutex connMutex_;
-
-  /// Number of TConnection object we've created
-  size_t numTConnections_;
-
-  /// Number of Connections processing or waiting to process
-  size_t numActiveProcessors_;
-
-  /// Limit for how many TConnection objects to cache
-  size_t connectionStackLimit_;
-
-  /// Limit for number of connections processing or waiting to process
-  size_t maxActiveProcessors_;
-
-  /// Limit for number of open connections
-  size_t maxConnections_;
-
-  /// Limit for frame size
-  size_t maxFrameSize_;
-
-  /// Time in milliseconds before an unperformed task expires (0 == infinite).
-  int64_t taskExpireTime_;
-
-  /**
-   * Hysteresis for overload state.  This is the fraction of the overload
-   * value that needs to be reached before the overload state is cleared;
-   * must be <= 1.0.
-   */
-  double overloadHysteresis_;
-
-  /// Action to take when we're overloaded.
-  TOverloadAction overloadAction_;
-
-  /**
-   * The write buffer is initialized (and when idleWriteBufferLimit_ is checked
-   * and found to be exceeded, reinitialized) to this size.
-   */
-  size_t writeBufferDefaultSize_;
-
-  /**
-   * Max read buffer size for an idle TConnection.  When we place an idle
-   * TConnection into connectionStack_ or on every resizeBufferEveryN_ calls,
-   * we will free the buffer (such that it will be reinitialized by the next
-   * received frame) if it has exceeded this limit.  0 disables this check.
-   */
-  size_t idleReadBufferLimit_;
-
-  /**
-   * Max write buffer size for an idle connection.  When we place an idle
-   * TConnection into connectionStack_ or on every resizeBufferEveryN_ calls,
-   * we insure that its write buffer is <= to this size; otherwise we
-   * replace it with a new one of writeBufferDefaultSize_ bytes to insure that
-   * idle connections don't hog memory. 0 disables this check.
-   */
-  size_t idleWriteBufferLimit_;
-
-  /**
-   * Every N calls we check the buffer size limits on a connected TConnection.
-   * 0 disables (i.e. the checks are only done when a connection closes).
-   */
-  int32_t resizeBufferEveryN_;
-
-  /// Set if we are currently in an overloaded state.
-  bool overloaded_;
-
-  /// Count of connections dropped since overload started
-  uint32_t nConnectionsDropped_;
-
-  /// Count of connections dropped on overload since server started
-  uint64_t nTotalConnectionsDropped_;
-
-  /**
-   * This is a stack of all the objects that have been created but that
-   * are NOT currently in use. When we close a connection, we place it on this
-   * stack so that the object can be reused later, rather than freeing the
-   * memory and reallocating a new object later.
-   */
-  std::stack<TConnection*> connectionStack_;
-
-  /**
-   * This container holds pointers to all active connections. This container
-   * allows the server to clean up unlcosed connection objects at destruction,
-   * which in turn allows their transports, protocols, processors and handlers
-   * to deallocate and clean up correctly.
-   */
-  std::vector<TConnection*> activeConnections_;
-
-  /**
-   * Called when server socket had something happen.  We accept all waiting
-   * client connections on listen socket fd and assign TConnection objects
-   * to handle those requests.
-   *
-   * @param fd the listen socket.
-   * @param which the event flag that triggered the handler.
-   */
-  void handleEvent(THRIFT_SOCKET fd, short which);
-
-  void init(int port) {
-    serverSocket_ = THRIFT_INVALID_SOCKET;
-    numIOThreads_ = DEFAULT_IO_THREADS;
-    nextIOThread_ = 0;
-    useHighPriorityIOThreads_ = false;
-    port_ = port;
-    userEventBase_ = NULL;
-    threadPoolProcessing_ = false;
-    numTConnections_ = 0;
-    numActiveProcessors_ = 0;
-    connectionStackLimit_ = CONNECTION_STACK_LIMIT;
-    maxActiveProcessors_ = MAX_ACTIVE_PROCESSORS;
-    maxConnections_ = MAX_CONNECTIONS;
-    maxFrameSize_ = MAX_FRAME_SIZE;
-    taskExpireTime_ = 0;
-    overloadHysteresis_ = 0.8;
-    overloadAction_ = T_OVERLOAD_NO_ACTION;
-    writeBufferDefaultSize_ = WRITE_BUFFER_DEFAULT_SIZE;
-    idleReadBufferLimit_ = IDLE_READ_BUFFER_LIMIT;
-    idleWriteBufferLimit_ = IDLE_WRITE_BUFFER_LIMIT;
-    resizeBufferEveryN_ = RESIZE_BUFFER_EVERY_N;
-    overloaded_ = false;
-    nConnectionsDropped_ = 0;
-    nTotalConnectionsDropped_ = 0;
-  }
-
- public:
-  template<typename ProcessorFactory>
-  TNonblockingServer(
-      const boost::shared_ptr<ProcessorFactory>& processorFactory,
-      int port,
-      THRIFT_OVERLOAD_IF(ProcessorFactory, TProcessorFactory)) :
-    TServer(processorFactory) {
-    init(port);
-  }
-
-  template<typename Processor>
-  TNonblockingServer(const boost::shared_ptr<Processor>& processor,
-                     int port,
-                     THRIFT_OVERLOAD_IF(Processor, TProcessor)) :
-    TServer(processor) {
-    init(port);
-  }
-
-  template<typename ProcessorFactory>
-  TNonblockingServer(
-      const boost::shared_ptr<ProcessorFactory>& processorFactory,
-      const boost::shared_ptr<TProtocolFactory>& protocolFactory,
-      int port,
-      const boost::shared_ptr<ThreadManager>& threadManager =
-        boost::shared_ptr<ThreadManager>(),
-      THRIFT_OVERLOAD_IF(ProcessorFactory, TProcessorFactory)) :
-    TServer(processorFactory) {
-
-    init(port);
-
-    setInputProtocolFactory(protocolFactory);
-    setOutputProtocolFactory(protocolFactory);
-    setThreadManager(threadManager);
-  }
-
-  template<typename Processor>
-  TNonblockingServer(
-      const boost::shared_ptr<Processor>& processor,
-      const boost::shared_ptr<TProtocolFactory>& protocolFactory,
-      int port,
-      const boost::shared_ptr<ThreadManager>& threadManager =
-        boost::shared_ptr<ThreadManager>(),
-      THRIFT_OVERLOAD_IF(Processor, TProcessor)) :
-    TServer(processor) {
-
-    init(port);
-
-    setInputProtocolFactory(protocolFactory);
-    setOutputProtocolFactory(protocolFactory);
-    setThreadManager(threadManager);
-  }
-
-  template<typename ProcessorFactory>
-  TNonblockingServer(
-      const boost::shared_ptr<ProcessorFactory>& processorFactory,
-      const boost::shared_ptr<TTransportFactory>& inputTransportFactory,
-      const boost::shared_ptr<TTransportFactory>& outputTransportFactory,
-      const boost::shared_ptr<TProtocolFactory>& inputProtocolFactory,
-      const boost::shared_ptr<TProtocolFactory>& outputProtocolFactory,
-      int port,
-      const boost::shared_ptr<ThreadManager>& threadManager =
-        boost::shared_ptr<ThreadManager>(),
-      THRIFT_OVERLOAD_IF(ProcessorFactory, TProcessorFactory)) :
-    TServer(processorFactory) {
-
-    init(port);
-
-    setInputTransportFactory(inputTransportFactory);
-    setOutputTransportFactory(outputTransportFactory);
-    setInputProtocolFactory(inputProtocolFactory);
-    setOutputProtocolFactory(outputProtocolFactory);
-    setThreadManager(threadManager);
-  }
-
-  template<typename Processor>
-  TNonblockingServer(
-      const boost::shared_ptr<Processor>& processor,
-      const boost::shared_ptr<TTransportFactory>& inputTransportFactory,
-      const boost::shared_ptr<TTransportFactory>& outputTransportFactory,
-      const boost::shared_ptr<TProtocolFactory>& inputProtocolFactory,
-      const boost::shared_ptr<TProtocolFactory>& outputProtocolFactory,
-      int port,
-      const boost::shared_ptr<ThreadManager>& threadManager =
-        boost::shared_ptr<ThreadManager>(),
-      THRIFT_OVERLOAD_IF(Processor, TProcessor)) :
-    TServer(processor) {
-
-    init(port);
-
-    setInputTransportFactory(inputTransportFactory);
-    setOutputTransportFactory(outputTransportFactory);
-    setInputProtocolFactory(inputProtocolFactory);
-    setOutputProtocolFactory(outputProtocolFactory);
-    setThreadManager(threadManager);
-  }
-
-  ~TNonblockingServer();
-
-  void setThreadManager(boost::shared_ptr<ThreadManager> threadManager);
-
-  boost::shared_ptr<ThreadManager> getThreadManager() {
-    return threadManager_;
-  }
-
-  /**
-   * Sets the number of IO threads used by this server. Can only be used before
-   * the call to serve() and has no effect afterwards.  We always use a
-   * PosixThreadFactory for the IO worker threads, because they must joinable
-   * for clean shutdown.
-   */
-  void setNumIOThreads(size_t numThreads) {
-    numIOThreads_ = numThreads;
-  }
-
-  /** Return whether the IO threads will get high scheduling priority */
-  bool useHighPriorityIOThreads() const {
-    return useHighPriorityIOThreads_;
-  }
-
-  /** Set whether the IO threads will get high scheduling priority. */
-  void setUseHighPriorityIOThreads(bool val) {
-    useHighPriorityIOThreads_ = val;
-  }
-
-  /** Return the number of IO threads used by this server. */
-  size_t getNumIOThreads() const {
-    return numIOThreads_;
-  }
-
-  /**
-   * Get the maximum number of unused TConnection we will hold in reserve.
-   *
-   * @return the current limit on TConnection pool size.
-   */
-  size_t getConnectionStackLimit() const {
-    return connectionStackLimit_;
-  }
-
-  /**
-   * Set the maximum number of unused TConnection we will hold in reserve.
-   *
-   * @param sz the new limit for TConnection pool size.
-   */
-  void setConnectionStackLimit(size_t sz) {
-    connectionStackLimit_ = sz;
-  }
-
-  bool isThreadPoolProcessing() const {
-    return threadPoolProcessing_;
-  }
-
-  void addTask(boost::shared_ptr<Runnable> task) {
-    threadManager_->add(task, 0LL, taskExpireTime_);
-  }
-
-  /**
-   * Return the count of sockets currently connected to.
-   *
-   * @return count of connected sockets.
-   */
-  size_t getNumConnections() const {
-    return numTConnections_;
-  }
-
-  /**
-   * Return the count of sockets currently connected to.
-   *
-   * @return count of connected sockets.
-   */
-  size_t getNumActiveConnections() const {
-    return getNumConnections() - getNumIdleConnections();
-  }
-
-  /**
-   * Return the count of connection objects allocated but not in use.
-   *
-   * @return count of idle connection objects.
-   */
-  size_t getNumIdleConnections() const {
-    return connectionStack_.size();
-  }
-
-  /**
-   * Return count of number of connections which are currently processing.
-   * This is defined as a connection where all data has been received and
-   * either assigned a task (when threading) or passed to a handler (when
-   * not threading), and where the handler has not yet returned.
-   *
-   * @return # of connections currently processing.
-   */
-  size_t getNumActiveProcessors() const {
-    return numActiveProcessors_;
-  }
-
-  /// Increment the count of connections currently processing.
-  void incrementActiveProcessors() {
-    Guard g(connMutex_);
-    ++numActiveProcessors_;
-  }
-
-  /// Decrement the count of connections currently processing.
-  void decrementActiveProcessors() {
-    Guard g(connMutex_);
-    if (numActiveProcessors_ > 0) {
-      --numActiveProcessors_;
-    }
-  }
-
-  /**
-   * Get the maximum # of connections allowed before overload.
-   *
-   * @return current setting.
-   */
-  size_t getMaxConnections() const {
-    return maxConnections_;
-  }
-
-  /**
-   * Set the maximum # of connections allowed before overload.
-   *
-   * @param maxConnections new setting for maximum # of connections.
-   */
-  void setMaxConnections(size_t maxConnections) {
-    maxConnections_ = maxConnections;
-  }
-
-  /**
-   * Get the maximum # of connections waiting in handler/task before overload.
-   *
-   * @return current setting.
-   */
-  size_t getMaxActiveProcessors() const {
-    return maxActiveProcessors_;
-  }
-
-  /**
-   * Set the maximum # of connections waiting in handler/task before overload.
-   *
-   * @param maxActiveProcessors new setting for maximum # of active processes.
-   */
-  void setMaxActiveProcessors(size_t maxActiveProcessors) {
-    maxActiveProcessors_ = maxActiveProcessors;
-  }
-
-  /**
-   * Get the maximum allowed frame size.
-   *
-   * If a client tries to send a message larger than this limit,
-   * its connection will be closed.
-   *
-   * @return Maxium frame size, in bytes.
-   */
-  size_t getMaxFrameSize() const {
-    return maxFrameSize_;
-  }
-
-  /**
-   * Set the maximum allowed frame size.
-   *
-   * @param maxFrameSize The new maximum frame size.
-   */
-  void setMaxFrameSize(size_t maxFrameSize) {
-    maxFrameSize_ = maxFrameSize;
-  }
-
-  /**
-   * Get fraction of maximum limits before an overload condition is cleared.
-   *
-   * @return hysteresis fraction
-   */
-  double getOverloadHysteresis() const {
-    return overloadHysteresis_;
-  }
-
-  /**
-   * Set fraction of maximum limits before an overload condition is cleared.
-   * A good value would probably be between 0.5 and 0.9.
-   *
-   * @param hysteresisFraction fraction <= 1.0.
-   */
-  void setOverloadHysteresis(double hysteresisFraction) {
-    if (hysteresisFraction <= 1.0 && hysteresisFraction > 0.0) {
-      overloadHysteresis_ = hysteresisFraction;
-    }
-  }
-
-  /**
-   * Get the action the server will take on overload.
-   *
-   * @return a TOverloadAction enum value for the currently set action.
-   */
-  TOverloadAction getOverloadAction() const {
-    return overloadAction_;
-  }
-
-  /**
-   * Set the action the server is to take on overload.
-   *
-   * @param overloadAction a TOverloadAction enum value for the action.
-   */
-  void setOverloadAction(TOverloadAction overloadAction) {
-    overloadAction_ = overloadAction;
-  }
-
-  /**
-   * Get the time in milliseconds after which a task expires (0 == infinite).
-   *
-   * @return a 64-bit time in milliseconds.
-   */
-  int64_t getTaskExpireTime() const {
-    return taskExpireTime_;
-  }
-
-  /**
-   * Set the time in milliseconds after which a task expires (0 == infinite).
-   *
-   * @param taskExpireTime a 64-bit time in milliseconds.
-   */
-  void setTaskExpireTime(int64_t taskExpireTime) {
-    taskExpireTime_ = taskExpireTime;
-  }
-
-  /**
-   * Determine if the server is currently overloaded.
-   * This function checks the maximums for open connections and connections
-   * currently in processing, and sets an overload condition if they are
-   * exceeded.  The overload will persist until both values are below the
-   * current hysteresis fraction of their maximums.
-   *
-   * @return true if an overload condition exists, false if not.
-   */
-  bool serverOverloaded();
-
-  /** Pop and discard next task on threadpool wait queue.
-   *
-   * @return true if a task was discarded, false if the wait queue was empty.
-   */
-  bool drainPendingTask();
-
-  /**
-   * Get the starting size of a TConnection object's write buffer.
-   *
-   * @return # bytes we initialize a TConnection object's write buffer to.
-   */
-  size_t getWriteBufferDefaultSize() const {
-    return writeBufferDefaultSize_;
-  }
-
-  /**
-   * Set the starting size of a TConnection object's write buffer.
-   *
-   * @param size # bytes we initialize a TConnection object's write buffer to.
-   */
-  void setWriteBufferDefaultSize(size_t size) {
-    writeBufferDefaultSize_ = size;
-  }
-
-  /**
-   * Get the maximum size of read buffer allocated to idle TConnection objects.
-   *
-   * @return # bytes beyond which we will dealloc idle buffer.
-   */
-  size_t getIdleReadBufferLimit() const {
-    return idleReadBufferLimit_;
-  }
-
-  /**
-   * [NOTE: This is for backwards compatibility, use getIdleReadBufferLimit().]
-   * Get the maximum size of read buffer allocated to idle TConnection objects.
-   *
-   * @return # bytes beyond which we will dealloc idle buffer.
-   */
-  size_t getIdleBufferMemLimit() const {
-    return idleReadBufferLimit_;
-  }
-
-  /**
-   * Set the maximum size read buffer allocated to idle TConnection objects.
-   * If a TConnection object is found (either on connection close or between
-   * calls when resizeBufferEveryN_ is set) with more than this much memory
-   * allocated to its read buffer, we free it and allow it to be reinitialized
-   * on the next received frame.
-   *
-   * @param limit of bytes beyond which we will shrink buffers when checked.
-   */
-  void setIdleReadBufferLimit(size_t limit) {
-    idleReadBufferLimit_ = limit;
-  }
-
-  /**
-   * [NOTE: This is for backwards compatibility, use setIdleReadBufferLimit().]
-   * Set the maximum size read buffer allocated to idle TConnection objects.
-   * If a TConnection object is found (either on connection close or between
-   * calls when resizeBufferEveryN_ is set) with more than this much memory
-   * allocated to its read buffer, we free it and allow it to be reinitialized
-   * on the next received frame.
-   *
-   * @param limit of bytes beyond which we will shrink buffers when checked.
-   */
-  void setIdleBufferMemLimit(size_t limit) {
-    idleReadBufferLimit_ = limit;
-  }
-
-
-
-  /**
-   * Get the maximum size of write buffer allocated to idle TConnection objects.
-   *
-   * @return # bytes beyond which we will reallocate buffers when checked.
-   */
-  size_t getIdleWriteBufferLimit() const {
-    return idleWriteBufferLimit_;
-  }
-
-  /**
-   * Set the maximum size write buffer allocated to idle TConnection objects.
-   * If a TConnection object is found (either on connection close or between
-   * calls when resizeBufferEveryN_ is set) with more than this much memory
-   * allocated to its write buffer, we destroy and construct that buffer with
-   * writeBufferDefaultSize_ bytes.
-   *
-   * @param limit of bytes beyond which we will shrink buffers when idle.
-   */
-  void setIdleWriteBufferLimit(size_t limit) {
-    idleWriteBufferLimit_ = limit;
-  }
-
-  /**
-   * Get # of calls made between buffer size checks.  0 means disabled.
-   *
-   * @return # of calls between buffer size checks.
-   */
-  int32_t getResizeBufferEveryN() const {
-    return resizeBufferEveryN_;
-  }
-
-  /**
-   * Check buffer sizes every "count" calls.  This allows buffer limits
-   * to be enforced for persistant connections with a controllable degree
-   * of overhead. 0 disables checks except at connection close.
-   *
-   * @param count the number of calls between checks, or 0 to disable
-   */
-  void setResizeBufferEveryN(int32_t count) {
-    resizeBufferEveryN_ = count;
-  }
-
-  /**
-   * Main workhorse function, starts up the server listening on a port and
-   * loops over the libevent handler.
-   */
-  void serve();
-
-  /**
-   * Causes the server to terminate gracefully (can be called from any thread).
-   */
-  void stop();
-
-  /// Creates a socket to listen on and binds it to the local port.
-  void createAndListenOnSocket();
-
-  /**
-   * Takes a socket created by createAndListenOnSocket() and sets various
-   * options on it to prepare for use in the server.
-   *
-   * @param fd descriptor of socket to be initialized/
-   */
-  void listenSocket(THRIFT_SOCKET fd);
-
-  /**
-   * Register the optional user-provided event-base (for single-thread servers)
-   *
-   * This method should be used when the server is running in a single-thread
-   * mode, and the event base is provided by the user (i.e., the caller).
-   *
-   * @param user_event_base the user-provided event-base. The user is
-   * responsible for freeing the event base memory.
-   */
-  void registerEvents(event_base* user_event_base);
-
-  /**
-   * Returns the optional user-provided event-base (for single-thread servers).
-   */
-  event_base* getUserEventBase() const { return userEventBase_; }
-
- private:
-  /**
-   * Callback function that the threadmanager calls when a task reaches
-   * its expiration time.  It is needed to clean up the expired connection.
-   *
-   * @param task the runnable associated with the expired task.
-   */
-  void expireClose(boost::shared_ptr<Runnable> task);
-
-  /**
-   * Return an initialized connection object.  Creates or recovers from
-   * pool a TConnection and initializes it with the provided socket FD
-   * and flags.
-   *
-   * @param socket FD of socket associated with this connection.
-   * @param addr the sockaddr of the client
-   * @param addrLen the length of addr
-   * @return pointer to initialized TConnection object.
-   */
-  TConnection* createConnection(THRIFT_SOCKET socket, const sockaddr* addr,
-                                            socklen_t addrLen);
-
-  /**
-   * Returns a connection to pool or deletion.  If the connection pool
-   * (a stack) isn't full, place the connection object on it, otherwise
-   * just delete it.
-   *
-   * @param connection the TConection being returned.
-   */
-  void returnConnection(TConnection* connection);
-};
-
-class TNonblockingIOThread : public Runnable {
- public:
-  // Creates an IO thread and sets up the event base.  The listenSocket should
-  // be a valid FD on which listen() has already been called.  If the
-  // listenSocket is < 0, accepting will not be done.
-  TNonblockingIOThread(TNonblockingServer* server,
-                       int number,
-                       THRIFT_SOCKET listenSocket,
-                       bool useHighPriority);
-
-  ~TNonblockingIOThread();
-
-  // Returns the event-base for this thread.
-  event_base* getEventBase() const { return eventBase_; }
-
-  // Returns the server for this thread.
-  TNonblockingServer* getServer() const { return server_; }
-
-  // Returns the number of this IO thread.
-  int getThreadNumber() const { return number_; }
-
-  // Returns the thread id associated with this object.  This should
-  // only be called after the thread has been started.
-  Thread::id_t getThreadId() const { return threadId_; }
-
-  // Returns the send-fd for task complete notifications.
-  evutil_socket_t getNotificationSendFD() const { return notificationPipeFDs_[1]; }
-
-  // Returns the read-fd for task complete notifications.
-  evutil_socket_t getNotificationRecvFD() const { return notificationPipeFDs_[0]; }
-
-  // Returns the actual thread object associated with this IO thread.
-  boost::shared_ptr<Thread> getThread() const { return thread_; }
-
-  // Sets the actual thread object associated with this IO thread.
-  void setThread(const boost::shared_ptr<Thread>& t) { thread_ = t; }
-
-  // Used by TConnection objects to indicate processing has finished.
-  bool notify(TNonblockingServer::TConnection* conn);
-
-  // Enters the event loop and does not return until a call to stop().
-  virtual void run();
-
-  // Exits the event loop as soon as possible.
-  void stop();
-
-  // Ensures that the event-loop thread is fully finished and shut down.
-  void join();
-
-  /// Registers the events for the notification & listen sockets
-  void registerEvents();
-
- private:
-  /**
-   * C-callable event handler for signaling task completion.  Provides a
-   * callback that libevent can understand that will read a connection
-   * object's address from a pipe and call connection->transition() for
-   * that object.
-   *
-   * @param fd the descriptor the event occurred on.
-   */
-  static void notifyHandler(evutil_socket_t fd, short which, void* v);
-
-  /**
-   * C-callable event handler for listener events.  Provides a callback
-   * that libevent can understand which invokes server->handleEvent().
-   *
-   * @param fd the descriptor the event occured on.
-   * @param which the flags associated with the event.
-   * @param v void* callback arg where we placed TNonblockingServer's "this".
-   */
-  static void listenHandler(evutil_socket_t fd, short which, void* v) {
-    ((TNonblockingServer*)v)->handleEvent(fd, which);
-  }
-
-  /// Exits the loop ASAP in case of shutdown or error.
-  void breakLoop(bool error);
-
-  /// Create the pipe used to notify I/O process of task completion.
-  void createNotificationPipe();
-
-  /// Unregisters our events for notification and listen sockets.
-  void cleanupEvents();
-
-  /// Sets (or clears) high priority scheduling status for the current thread.
-  void setCurrentThreadHighPriority(bool value);
-
- private:
-  /// associated server
-  TNonblockingServer* server_;
-
-  /// thread number (for debugging).
-  const int number_;
-
-  /// The actual physical thread id.
-  Thread::id_t threadId_;
-
-  /// If listenSocket_ >= 0, adds an event on the event_base to accept conns
-  THRIFT_SOCKET listenSocket_;
-
-  /// Sets a high scheduling priority when running
-  bool useHighPriority_;
-
-  /// pointer to eventbase to be used for looping
-  event_base* eventBase_;
-
-  /// Set to true if this class is responsible for freeing the event base
-  /// memory.
-  bool ownEventBase_;
-
-  /// Used with eventBase_ for connection events (only in listener thread)
-  struct event serverEvent_;
-
-  /// Used with eventBase_ for task completion notification
-  struct event notificationEvent_;
-
- /// File descriptors for pipe used for task completion notification.
-  evutil_socket_t notificationPipeFDs_[2];
-
-  /// Actual IO Thread
-  boost::shared_ptr<Thread> thread_;
-};
-
-}}} // apache::thrift::server
-
-#endif // #ifndef _THRIFT_SERVER_TNONBLOCKINGSERVER_H_

http://git-wip-us.apache.org/repos/asf/airavata-php-gateway/blob/d55608f1/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/thrift/server/TServer.cpp
----------------------------------------------------------------------
diff --git a/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/thrift/server/TServer.cpp b/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/thrift/server/TServer.cpp
deleted file mode 100755
index f4ce744..0000000
--- a/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/thrift/server/TServer.cpp
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-#include <thrift/thrift-config.h>
-
-#ifdef HAVE_SYS_TIME_H
-#include <sys/time.h>
-#endif
-#ifdef HAVE_SYS_RESOURCE_H
-#include <sys/resource.h>
-#endif
-
-#ifdef HAVE_UNISTD_H
-#include <unistd.h>
-#endif
-
-namespace apache { namespace thrift { namespace server {
-
-int increase_max_fds(int max_fds=(1<<24))  {
-  struct rlimit fdmaxrl;
-
-  for(fdmaxrl.rlim_cur = max_fds, fdmaxrl.rlim_max = max_fds;
-      max_fds && (setrlimit(RLIMIT_NOFILE, &fdmaxrl) < 0);
-      fdmaxrl.rlim_cur = max_fds, fdmaxrl.rlim_max = max_fds) {
-    max_fds /= 2;
-  }
-
-  return static_cast<int>(fdmaxrl.rlim_cur);
-}
-
-}}} // apache::thrift::server


Mime
View raw message