airavata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sma...@apache.org
Subject [06/47] Added c++ client samples for integrattion of airavata with any other application's c++ interface
Date Sat, 12 Jul 2014 04:08:24 GMT
http://git-wip-us.apache.org/repos/asf/airavata/blob/f891b7dc/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/thrift/server/TNonblockingServer.cpp
----------------------------------------------------------------------
diff --git a/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/thrift/server/TNonblockingServer.cpp b/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/thrift/server/TNonblockingServer.cpp
new file mode 100644
index 0000000..b9553c4
--- /dev/null
+++ b/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/thrift/server/TNonblockingServer.cpp
@@ -0,0 +1,1567 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#define __STDC_FORMAT_MACROS
+
+#include <thrift/thrift-config.h>
+
+#include <thrift/server/TNonblockingServer.h>
+#include <thrift/concurrency/Exception.h>
+#include <thrift/transport/TSocket.h>
+#include <thrift/concurrency/PlatformThreadFactory.h>
+#include <thrift/transport/PlatformSocket.h>
+
+#include <iostream>
+
+#ifdef HAVE_SYS_SOCKET_H
+#include <sys/socket.h>
+#endif
+
+#ifdef HAVE_NETINET_IN_H
+#include <netinet/in.h>
+#include <netinet/tcp.h>
+#endif
+
+#ifdef HAVE_ARPA_INET_H
+#include <arpa/inet.h>
+#endif
+
+#ifdef HAVE_NETDB_H
+#include <netdb.h>
+#endif
+
+#ifdef HAVE_FCNTL_H
+#include <fcntl.h>
+#endif
+
+#include <assert.h>
+
+#ifdef HAVE_SCHED_H
+#include <sched.h>
+#endif
+
+#ifndef AF_LOCAL
+#define AF_LOCAL AF_UNIX
+#endif
+
+#if !defined(PRIu32)
+#define PRIu32 "I32u"
+#define PRIu64 "I64u"
+#endif
+
+namespace apache { namespace thrift { namespace server {
+
+using namespace apache::thrift::protocol;
+using namespace apache::thrift::transport;
+using namespace apache::thrift::concurrency;
+using namespace std;
+using apache::thrift::transport::TSocket;
+using apache::thrift::transport::TTransportException;
+using boost::shared_ptr;
+
+/// Three states for sockets: recv frame size, recv data, and send mode
+enum TSocketState {
+  SOCKET_RECV_FRAMING,
+  SOCKET_RECV,
+  SOCKET_SEND
+};
+
+/**
+ * Five states for the nonblocking server:
+ *  1) initialize
+ *  2) read 4 byte frame size
+ *  3) read frame of data
+ *  4) send back data (if any)
+ *  5) force immediate connection close
+ */
+enum TAppState {
+  APP_INIT,
+  APP_READ_FRAME_SIZE,
+  APP_READ_REQUEST,
+  APP_WAIT_TASK,
+  APP_SEND_RESULT,
+  APP_CLOSE_CONNECTION
+};
+
+/**
+ * Represents a connection that is handled via libevent. This connection
+ * essentially encapsulates a socket that has some associated libevent state.
+ */
+class TNonblockingServer::TConnection {
+ private:
+  /// Server IO Thread handling this connection
+  TNonblockingIOThread* ioThread_;
+
+  /// Server handle
+  TNonblockingServer* server_;
+
+  /// TProcessor
+  boost::shared_ptr<TProcessor> processor_;
+
+  /// Object wrapping network socket
+  boost::shared_ptr<TSocket> tSocket_;
+
+  /// Libevent object
+  struct event event_;
+
+  /// Libevent flags
+  short eventFlags_;
+
+  /// Socket mode
+  TSocketState socketState_;
+
+  /// Application state
+  TAppState appState_;
+
+  /// How much data needed to read
+  uint32_t readWant_;
+
+  /// Where in the read buffer are we
+  uint32_t readBufferPos_;
+
+  /// Read buffer
+  uint8_t* readBuffer_;
+
+  /// Read buffer size
+  uint32_t readBufferSize_;
+
+  /// Write buffer
+  uint8_t* writeBuffer_;
+
+  /// Write buffer size
+  uint32_t writeBufferSize_;
+
+  /// How far through writing are we?
+  uint32_t writeBufferPos_;
+
+  /// Largest size of write buffer seen since buffer was constructed
+  size_t largestWriteBufferSize_;
+
+  /// Count of the number of calls for use with getResizeBufferEveryN().
+  int32_t callsForResize_;
+
+  /// Task handle
+  int taskHandle_;
+
+  /// Task event
+  struct event taskEvent_;
+
+  /// Transport to read from
+  boost::shared_ptr<TMemoryBuffer> inputTransport_;
+
+  /// Transport that processor writes to
+  boost::shared_ptr<TMemoryBuffer> outputTransport_;
+
+  /// extra transport generated by transport factory (e.g. BufferedRouterTransport)
+  boost::shared_ptr<TTransport> factoryInputTransport_;
+  boost::shared_ptr<TTransport> factoryOutputTransport_;
+
+  /// Protocol decoder
+  boost::shared_ptr<TProtocol> inputProtocol_;
+
+  /// Protocol encoder
+  boost::shared_ptr<TProtocol> outputProtocol_;
+
+  /// Server event handler, if any
+  boost::shared_ptr<TServerEventHandler> serverEventHandler_;
+
+  /// Thrift call context, if any
+  void *connectionContext_;
+
+  /// Go into read mode
+  void setRead() {
+    setFlags(EV_READ | EV_PERSIST);
+  }
+
+  /// Go into write mode
+  void setWrite() {
+    setFlags(EV_WRITE | EV_PERSIST);
+  }
+
+  /// Set socket idle
+  void setIdle() {
+    setFlags(0);
+  }
+
+  /**
+   * Set event flags for this connection.
+   *
+   * @param eventFlags flags we pass to libevent for the connection.
+   */
+  void setFlags(short eventFlags);
+
+  /**
+   * Libevent handler called (via our static wrapper) when the connection
+   * socket had something happen.  Rather than use the flags libevent passed,
+   * we use the connection state to determine whether we need to read or
+   * write the socket.
+   */
+  void workSocket();
+
+ public:
+
+  class Task;
+
+  /// Constructor
+  TConnection(THRIFT_SOCKET socket, TNonblockingIOThread* ioThread,
+              const sockaddr* addr, socklen_t addrLen) {
+    readBuffer_ = NULL;
+    readBufferSize_ = 0;
+
+    ioThread_ = ioThread;
+    server_ = ioThread->getServer();
+
+    // Allocate input and output transports these only need to be allocated
+    // once per TConnection (they don't need to be reallocated on init() call)
+    inputTransport_.reset(new TMemoryBuffer(readBuffer_, readBufferSize_));
+    outputTransport_.reset(
+      new TMemoryBuffer(static_cast<uint32_t>(server_->getWriteBufferDefaultSize())));
+    tSocket_.reset(new TSocket());
+    init(socket, ioThread, addr, addrLen);
+  }
+
+  ~TConnection() {
+    std::free(readBuffer_);
+  }
+
+  /// Close this connection and free or reset its resources.
+  void close();
+
+ /**
+   * Check buffers against any size limits and shrink it if exceeded.
+   *
+   * @param readLimit we reduce read buffer size to this (if nonzero).
+   * @param writeLimit if nonzero and write buffer is larger, replace it.
+   */
+  void checkIdleBufferMemLimit(size_t readLimit, size_t writeLimit);
+
+  /// Initialize
+  void init(THRIFT_SOCKET socket, TNonblockingIOThread* ioThread,
+            const sockaddr* addr, socklen_t addrLen);
+
+  /**
+   * This is called when the application transitions from one state into
+   * another. This means that it has finished writing the data that it needed
+   * to, or finished receiving the data that it needed to.
+   */
+  void transition();
+
+  /**
+   * C-callable event handler for connection events.  Provides a callback
+   * that libevent can understand which invokes connection_->workSocket().
+   *
+   * @param fd the descriptor the event occurred on.
+   * @param which the flags associated with the event.
+   * @param v void* callback arg where we placed TConnection's "this".
+   */
+  static void eventHandler(evutil_socket_t fd, short /* which */, void* v) {
+    assert(fd == ((TConnection*)v)->getTSocket()->getSocketFD());
+    ((TConnection*)v)->workSocket();
+  }
+
+  /**
+   * Notification to server that processing has ended on this request.
+   * Can be called either when processing is completed or when a waiting
+   * task has been preemptively terminated (on overload).
+   *
+   * Don't call this from the IO thread itself.
+   *
+   * @return true if successful, false if unable to notify (check THRIFT_GET_SOCKET_ERROR).
+   */
+  bool notifyIOThread() {
+    return ioThread_->notify(this);
+  }
+
+  /*
+   * Returns the number of this connection's currently assigned IO
+   * thread.
+   */
+  int getIOThreadNumber() const {
+    return ioThread_->getThreadNumber();
+  }
+
+  /// Force connection shutdown for this connection.
+  void forceClose() {
+    appState_ = APP_CLOSE_CONNECTION;
+    if (!notifyIOThread()) {
+      throw TException("TConnection::forceClose: failed write on notify pipe");
+    }
+  }
+
+  /// return the server this connection was initialized for.
+  TNonblockingServer* getServer() const {
+    return server_;
+  }
+
+  /// get state of connection.
+  TAppState getState() const {
+    return appState_;
+  }
+
+  /// return the TSocket transport wrapping this network connection
+  boost::shared_ptr<TSocket> getTSocket() const {
+    return tSocket_;
+  }
+
+  /// return the server event handler if any
+  boost::shared_ptr<TServerEventHandler> getServerEventHandler() {
+    return serverEventHandler_;
+  }
+
+  /// return the Thrift connection context if any
+  void* getConnectionContext() {
+    return connectionContext_;
+  }
+
+};
+
+class TNonblockingServer::TConnection::Task: public Runnable {
+ public:
+  Task(boost::shared_ptr<TProcessor> processor,
+       boost::shared_ptr<TProtocol> input,
+       boost::shared_ptr<TProtocol> output,
+       TConnection* connection) :
+    processor_(processor),
+    input_(input),
+    output_(output),
+    connection_(connection),
+    serverEventHandler_(connection_->getServerEventHandler()),
+    connectionContext_(connection_->getConnectionContext()) {}
+
+  void run() {
+    try {
+      for (;;) {
+        if (serverEventHandler_) {
+          serverEventHandler_->processContext(connectionContext_, connection_->getTSocket());
+        }
+        if (!processor_->process(input_, output_, connectionContext_) ||
+            !input_->getTransport()->peek()) {
+          break;
+        }
+      }
+    } catch (const TTransportException& ttx) {
+      GlobalOutput.printf("TNonblockingServer: client died: %s", ttx.what());
+    } catch (const bad_alloc&) {
+      GlobalOutput("TNonblockingServer: caught bad_alloc exception.");
+      exit(1);
+    } catch (const std::exception& x) {
+      GlobalOutput.printf("TNonblockingServer: process() exception: %s: %s",
+                          typeid(x).name(), x.what());
+    } catch (...) {
+      GlobalOutput.printf(
+        "TNonblockingServer: unknown exception while processing.");
+    }
+
+    // Signal completion back to the libevent thread via a pipe
+    if (!connection_->notifyIOThread()) {
+      throw TException("TNonblockingServer::Task::run: failed write on notify pipe");
+    }
+  }
+
+  TConnection* getTConnection() {
+    return connection_;
+  }
+
+ private:
+  boost::shared_ptr<TProcessor> processor_;
+  boost::shared_ptr<TProtocol> input_;
+  boost::shared_ptr<TProtocol> output_;
+  TConnection* connection_;
+  boost::shared_ptr<TServerEventHandler> serverEventHandler_;
+  void* connectionContext_;
+};
+
+void TNonblockingServer::TConnection::init(THRIFT_SOCKET socket,
+                                           TNonblockingIOThread* ioThread,
+                                           const sockaddr* addr,
+                                           socklen_t addrLen) {
+  tSocket_->setSocketFD(socket);
+  tSocket_->setCachedAddress(addr, addrLen);
+
+  ioThread_ = ioThread;
+  server_ = ioThread->getServer();
+  appState_ = APP_INIT;
+  eventFlags_ = 0;
+
+  readBufferPos_ = 0;
+  readWant_ = 0;
+
+  writeBuffer_ = NULL;
+  writeBufferSize_ = 0;
+  writeBufferPos_ = 0;
+  largestWriteBufferSize_ = 0;
+
+  socketState_ = SOCKET_RECV_FRAMING;
+  callsForResize_ = 0;
+
+  // get input/transports
+  factoryInputTransport_ = server_->getInputTransportFactory()->getTransport(
+                             inputTransport_);
+  factoryOutputTransport_ = server_->getOutputTransportFactory()->getTransport(
+                             outputTransport_);
+
+  // Create protocol
+  inputProtocol_ = server_->getInputProtocolFactory()->getProtocol(
+                     factoryInputTransport_);
+  outputProtocol_ = server_->getOutputProtocolFactory()->getProtocol(
+                     factoryOutputTransport_);
+
+  // Set up for any server event handler
+  serverEventHandler_ = server_->getEventHandler();
+  if (serverEventHandler_) {
+    connectionContext_ = serverEventHandler_->createContext(inputProtocol_,
+                                                            outputProtocol_);
+  } else {
+    connectionContext_ = NULL;
+  }
+
+  // Get the processor
+  processor_ = server_->getProcessor(inputProtocol_, outputProtocol_, tSocket_);
+}
+
+void TNonblockingServer::TConnection::workSocket() {
+  int got=0, left=0, sent=0;
+  uint32_t fetch = 0;
+
+  switch (socketState_) {
+  case SOCKET_RECV_FRAMING:
+    union {
+      uint8_t buf[sizeof(uint32_t)];
+      uint32_t size;
+    } framing;
+
+    // if we've already received some bytes we kept them here
+    framing.size = readWant_;
+    // determine size of this frame
+    try {
+      // Read from the socket
+      fetch = tSocket_->read(&framing.buf[readBufferPos_],
+                             uint32_t(sizeof(framing.size) - readBufferPos_));
+      if (fetch == 0) {
+        // Whenever we get here it means a remote disconnect
+        close();
+        return;
+      }
+      readBufferPos_ += fetch;
+    } catch (TTransportException& te) {
+      GlobalOutput.printf("TConnection::workSocket(): %s", te.what());
+      close();
+
+      return;
+    }
+
+    if (readBufferPos_ < sizeof(framing.size)) {
+      // more needed before frame size is known -- save what we have so far
+      readWant_ = framing.size;
+      return;
+    }
+
+    readWant_ = ntohl(framing.size);
+    if (readWant_ > server_->getMaxFrameSize()) {
+      // Don't allow giant frame sizes.  This prevents bad clients from
+      // causing us to try and allocate a giant buffer.
+      GlobalOutput.printf("TNonblockingServer: frame size too large "
+                          "(%" PRIu32 " > %" PRIu64 ") from client %s. "
+                          "Remote side not using TFramedTransport?",
+                          readWant_,
+                          (uint64_t)server_->getMaxFrameSize(),
+                          tSocket_->getSocketInfo().c_str());
+      close();
+      return;
+    }
+    // size known; now get the rest of the frame
+    transition();
+    return;
+
+  case SOCKET_RECV:
+    // It is an error to be in this state if we already have all the data
+    assert(readBufferPos_ < readWant_);
+
+    try {
+      // Read from the socket
+      fetch = readWant_ - readBufferPos_;
+      got = tSocket_->read(readBuffer_ + readBufferPos_, fetch);
+    }
+    catch (TTransportException& te) {
+      GlobalOutput.printf("TConnection::workSocket(): %s", te.what());
+      close();
+
+      return;
+    }
+
+    if (got > 0) {
+      // Move along in the buffer
+      readBufferPos_ += got;
+
+      // Check that we did not overdo it
+      assert(readBufferPos_ <= readWant_);
+
+      // We are done reading, move onto the next state
+      if (readBufferPos_ == readWant_) {
+        transition();
+      }
+      return;
+    }
+
+    // Whenever we get down here it means a remote disconnect
+    close();
+
+    return;
+
+  case SOCKET_SEND:
+    // Should never have position past size
+    assert(writeBufferPos_ <= writeBufferSize_);
+
+    // If there is no data to send, then let us move on
+    if (writeBufferPos_ == writeBufferSize_) {
+      GlobalOutput("WARNING: Send state with no data to send\n");
+      transition();
+      return;
+    }
+
+    try {
+      left = writeBufferSize_ - writeBufferPos_;
+      sent = tSocket_->write_partial(writeBuffer_ + writeBufferPos_, left);
+    }
+    catch (TTransportException& te) {
+      GlobalOutput.printf("TConnection::workSocket(): %s ", te.what());
+      close();
+      return;
+    }
+
+    writeBufferPos_ += sent;
+
+    // Did we overdo it?
+    assert(writeBufferPos_ <= writeBufferSize_);
+
+    // We are done!
+    if (writeBufferPos_ == writeBufferSize_) {
+      transition();
+    }
+
+    return;
+
+  default:
+    GlobalOutput.printf("Unexpected Socket State %d", socketState_);
+    assert(0);
+  }
+}
+
+/**
+ * This is called when the application transitions from one state into
+ * another. This means that it has finished writing the data that it needed
+ * to, or finished receiving the data that it needed to.
+ */
+void TNonblockingServer::TConnection::transition() {
+  // ensure this connection is active right now
+  assert(ioThread_);
+  assert(server_);
+
+  // Switch upon the state that we are currently in and move to a new state
+  switch (appState_) {
+
+  case APP_READ_REQUEST:
+    // We are done reading the request, package the read buffer into transport
+    // and get back some data from the dispatch function
+    inputTransport_->resetBuffer(readBuffer_, readBufferPos_);
+    outputTransport_->resetBuffer();
+    // Prepend four bytes of blank space to the buffer so we can
+    // write the frame size there later.
+    outputTransport_->getWritePtr(4);
+    outputTransport_->wroteBytes(4);
+
+    server_->incrementActiveProcessors();
+
+    if (server_->isThreadPoolProcessing()) {
+      // We are setting up a Task to do this work and we will wait on it
+
+      // Create task and dispatch to the thread manager
+      boost::shared_ptr<Runnable> task =
+        boost::shared_ptr<Runnable>(new Task(processor_,
+                                             inputProtocol_,
+                                             outputProtocol_,
+                                             this));
+      // The application is now waiting on the task to finish
+      appState_ = APP_WAIT_TASK;
+
+        try {
+          server_->addTask(task);
+        } catch (IllegalStateException & ise) {
+          // The ThreadManager is not ready to handle any more tasks (it's probably shutting down).
+          GlobalOutput.printf("IllegalStateException: Server::process() %s", ise.what());
+          close();
+        }
+
+      // Set this connection idle so that libevent doesn't process more
+      // data on it while we're still waiting for the threadmanager to
+      // finish this task
+      setIdle();
+      return;
+    } else {
+      try {
+        if (serverEventHandler_) {
+          serverEventHandler_->processContext(connectionContext_,
+                                              getTSocket());
+        }
+        // Invoke the processor
+        processor_->process(inputProtocol_, outputProtocol_,
+                            connectionContext_);
+      } catch (const TTransportException &ttx) {
+        GlobalOutput.printf("TNonblockingServer transport error in "
+                            "process(): %s", ttx.what());
+        server_->decrementActiveProcessors();
+        close();
+        return;
+      } catch (const std::exception &x) {
+        GlobalOutput.printf("Server::process() uncaught exception: %s: %s",
+                            typeid(x).name(), x.what());
+        server_->decrementActiveProcessors();
+        close();
+        return;
+      } catch (...) {
+        GlobalOutput.printf("Server::process() unknown exception");
+        server_->decrementActiveProcessors();
+        close();
+        return;
+      }
+    }
+
+    // Intentionally fall through here, the call to process has written into
+    // the writeBuffer_
+
+  case APP_WAIT_TASK:
+    // We have now finished processing a task and the result has been written
+    // into the outputTransport_, so we grab its contents and place them into
+    // the writeBuffer_ for actual writing by the libevent thread
+
+    server_->decrementActiveProcessors();
+    // Get the result of the operation
+    outputTransport_->getBuffer(&writeBuffer_, &writeBufferSize_);
+
+    // If the function call generated return data, then move into the send
+    // state and get going
+    // 4 bytes were reserved for frame size
+    if (writeBufferSize_ > 4) {
+
+      // Move into write state
+      writeBufferPos_ = 0;
+      socketState_ = SOCKET_SEND;
+
+      // Put the frame size into the write buffer
+      int32_t frameSize = (int32_t)htonl(writeBufferSize_ - 4);
+      memcpy(writeBuffer_, &frameSize, 4);
+
+      // Socket into write mode
+      appState_ = APP_SEND_RESULT;
+      setWrite();
+
+      // Try to work the socket immediately
+      // workSocket();
+
+      return;
+    }
+
+    // In this case, the request was oneway and we should fall through
+    // right back into the read frame header state
+    goto LABEL_APP_INIT;
+
+  case APP_SEND_RESULT:
+    // it's now safe to perform buffer size housekeeping.
+    if (writeBufferSize_ > largestWriteBufferSize_) {
+      largestWriteBufferSize_ = writeBufferSize_;
+    }
+    if (server_->getResizeBufferEveryN() > 0
+        && ++callsForResize_ >= server_->getResizeBufferEveryN()) {
+      checkIdleBufferMemLimit(server_->getIdleReadBufferLimit(),
+                              server_->getIdleWriteBufferLimit());
+      callsForResize_ = 0;
+    }
+
+    // N.B.: We also intentionally fall through here into the INIT state!
+
+  LABEL_APP_INIT:
+  case APP_INIT:
+
+    // Clear write buffer variables
+    writeBuffer_ = NULL;
+    writeBufferPos_ = 0;
+    writeBufferSize_ = 0;
+
+    // Into read4 state we go
+    socketState_ = SOCKET_RECV_FRAMING;
+    appState_ = APP_READ_FRAME_SIZE;
+
+    readBufferPos_ = 0;
+
+    // Register read event
+    setRead();
+
+    // Try to work the socket right away
+    // workSocket();
+
+    return;
+
+  case APP_READ_FRAME_SIZE:
+    // We just read the request length
+    // Double the buffer size until it is big enough
+    if (readWant_ > readBufferSize_) {
+      if (readBufferSize_ == 0) {
+        readBufferSize_ = 1;
+      }
+      uint32_t newSize = readBufferSize_;
+      while (readWant_ > newSize) {
+        newSize *= 2;
+      }
+
+      uint8_t* newBuffer = (uint8_t*)std::realloc(readBuffer_, newSize);
+      if (newBuffer == NULL) {
+        // nothing else to be done...
+        throw std::bad_alloc();
+      }
+      readBuffer_ = newBuffer;
+      readBufferSize_ = newSize;
+    }
+
+    readBufferPos_= 0;
+
+    // Move into read request state
+    socketState_ = SOCKET_RECV;
+    appState_ = APP_READ_REQUEST;
+
+    // Work the socket right away
+    // workSocket();
+
+    return;
+
+  case APP_CLOSE_CONNECTION:
+    server_->decrementActiveProcessors();
+    close();
+    return;
+
+  default:
+    GlobalOutput.printf("Unexpected Application State %d", appState_);
+    assert(0);
+  }
+}
+
+void TNonblockingServer::TConnection::setFlags(short eventFlags) {
+  // Catch the do nothing case
+  if (eventFlags_ == eventFlags) {
+    return;
+  }
+
+  // Delete a previously existing event
+  if (eventFlags_ != 0) {
+    if (event_del(&event_) == -1) {
+      GlobalOutput("TConnection::setFlags event_del");
+      return;
+    }
+  }
+
+  // Update in memory structure
+  eventFlags_ = eventFlags;
+
+  // Do not call event_set if there are no flags
+  if (!eventFlags_) {
+    return;
+  }
+
+  /*
+   * event_set:
+   *
+   * Prepares the event structure &event to be used in future calls to
+   * event_add() and event_del().  The event will be prepared to call the
+   * eventHandler using the 'sock' file descriptor to monitor events.
+   *
+   * The events can be either EV_READ, EV_WRITE, or both, indicating
+   * that an application can read or write from the file respectively without
+   * blocking.
+   *
+   * The eventHandler will be called with the file descriptor that triggered
+   * the event and the type of event which will be one of: EV_TIMEOUT,
+   * EV_SIGNAL, EV_READ, EV_WRITE.
+   *
+   * The additional flag EV_PERSIST makes an event_add() persistent until
+   * event_del() has been called.
+   *
+   * Once initialized, the &event struct can be used repeatedly with
+   * event_add() and event_del() and does not need to be reinitialized unless
+   * the eventHandler and/or the argument to it are to be changed.  However,
+   * when an ev structure has been added to libevent using event_add() the
+   * structure must persist until the event occurs (assuming EV_PERSIST
+   * is not set) or is removed using event_del().  You may not reuse the same
+   * ev structure for multiple monitored descriptors; each descriptor needs
+   * its own ev.
+   */
+  event_set(&event_, tSocket_->getSocketFD(), eventFlags_,
+            TConnection::eventHandler, this);
+  event_base_set(ioThread_->getEventBase(), &event_);
+
+  // Add the event
+  if (event_add(&event_, 0) == -1) {
+    GlobalOutput("TConnection::setFlags(): could not event_add");
+  }
+}
+
+/**
+ * Closes a connection
+ */
+void TNonblockingServer::TConnection::close() {
+  // Delete the registered libevent
+  if (event_del(&event_) == -1) {
+    GlobalOutput.perror("TConnection::close() event_del", THRIFT_GET_SOCKET_ERROR);
+  }
+
+  if (serverEventHandler_) {
+    serverEventHandler_->deleteContext(connectionContext_, inputProtocol_, outputProtocol_);
+  }
+  ioThread_ = NULL;
+
+  // Close the socket
+  tSocket_->close();
+
+  // close any factory produced transports
+  factoryInputTransport_->close();
+  factoryOutputTransport_->close();
+
+  // Give this object back to the server that owns it
+  server_->returnConnection(this);
+}
+
+void TNonblockingServer::TConnection::checkIdleBufferMemLimit(
+    size_t readLimit,
+    size_t writeLimit) {
+  if (readLimit > 0 && readBufferSize_ > readLimit) {
+    free(readBuffer_);
+    readBuffer_ = NULL;
+    readBufferSize_ = 0;
+  }
+
+  if (writeLimit > 0 && largestWriteBufferSize_ > writeLimit) {
+    // just start over
+    outputTransport_->resetBuffer(static_cast<uint32_t>(server_->getWriteBufferDefaultSize()));
+    largestWriteBufferSize_ = 0;
+  }
+}
+
+TNonblockingServer::~TNonblockingServer() {
+  // Close any active connections (moves them to the idle connection stack)
+  while (activeConnections_.size()) {
+	  activeConnections_.front()->close();
+  }
+  // Clean up unused TConnection objects in connectionStack_
+  while (!connectionStack_.empty()) {
+    TConnection* connection = connectionStack_.top();
+    connectionStack_.pop();
+    delete connection;
+  }
+  // The TNonblockingIOThread objects have shared_ptrs to the Thread
+  // objects and the Thread objects have shared_ptrs to the TNonblockingIOThread
+  // objects (as runnable) so these objects will never deallocate without help.
+  while (!ioThreads_.empty()) {
+	  boost::shared_ptr<TNonblockingIOThread> iot = ioThreads_.back();
+	  ioThreads_.pop_back();
+	  iot->setThread(boost::shared_ptr<Thread>());
+  }
+}
+
+/**
+ * Creates a new connection either by reusing an object off the stack or
+ * by allocating a new one entirely
+ */
+TNonblockingServer::TConnection* TNonblockingServer::createConnection(
+    THRIFT_SOCKET socket, const sockaddr* addr, socklen_t addrLen) {
+  // Check the stack
+  Guard g(connMutex_);
+
+  // pick an IO thread to handle this connection -- currently round robin
+  assert(nextIOThread_ < ioThreads_.size());
+  int selectedThreadIdx = nextIOThread_;
+  nextIOThread_ = (nextIOThread_ + 1) % ioThreads_.size();
+
+  TNonblockingIOThread* ioThread = ioThreads_[selectedThreadIdx].get();
+
+  // Check the connection stack to see if we can re-use
+  TConnection* result = NULL;
+  if (connectionStack_.empty()) {
+    result = new TConnection(socket, ioThread, addr, addrLen);
+    ++numTConnections_;
+  } else {
+    result = connectionStack_.top();
+    connectionStack_.pop();
+    result->init(socket, ioThread, addr, addrLen);
+  }
+  activeConnections_.push_back(result);
+  return result;
+}
+
+/**
+ * Returns a connection to the stack
+ */
+void TNonblockingServer::returnConnection(TConnection* connection) {
+  Guard g(connMutex_);
+
+  activeConnections_.erase(std::remove(activeConnections_.begin(), activeConnections_.end(), connection), activeConnections_.end());
+
+  if (connectionStackLimit_ &&
+      (connectionStack_.size() >= connectionStackLimit_)) {
+    delete connection;
+    --numTConnections_;
+  } else {
+    connection->checkIdleBufferMemLimit(idleReadBufferLimit_, idleWriteBufferLimit_);
+    connectionStack_.push(connection);
+  }
+}
+
+/**
+ * Server socket had something happen.  We accept all waiting client
+ * connections on fd and assign TConnection objects to handle those requests.
+ */
+void TNonblockingServer::handleEvent(THRIFT_SOCKET fd, short which) {
+  (void) which;
+  // Make sure that libevent didn't mess up the socket handles
+  assert(fd == serverSocket_);
+
+  // Server socket accepted a new connection
+  socklen_t addrLen;
+  sockaddr_storage addrStorage;
+  sockaddr* addrp = (sockaddr*)&addrStorage;
+  addrLen = sizeof(addrStorage);
+
+  // Going to accept a new client socket
+  THRIFT_SOCKET clientSocket;
+
+  // Accept as many new clients as possible, even though libevent signaled only
+  // one, this helps us to avoid having to go back into the libevent engine so
+  // many times
+  while ((clientSocket = ::accept(fd, addrp, &addrLen)) != -1) {
+    // If we're overloaded, take action here
+    if (overloadAction_ != T_OVERLOAD_NO_ACTION && serverOverloaded()) {
+      Guard g(connMutex_);
+      nConnectionsDropped_++;
+      nTotalConnectionsDropped_++;
+      if (overloadAction_ == T_OVERLOAD_CLOSE_ON_ACCEPT) {
+        ::THRIFT_CLOSESOCKET(clientSocket);
+        return;
+      } else if (overloadAction_ == T_OVERLOAD_DRAIN_TASK_QUEUE) {
+        if (!drainPendingTask()) {
+          // Nothing left to discard, so we drop connection instead.
+          ::THRIFT_CLOSESOCKET(clientSocket);
+          return;
+        }
+      }
+    }
+
+    // Explicitly set this socket to NONBLOCK mode
+    int flags;
+    if ((flags = THRIFT_FCNTL(clientSocket, THRIFT_F_GETFL, 0)) < 0 ||
+        THRIFT_FCNTL(clientSocket, THRIFT_F_SETFL, flags | THRIFT_O_NONBLOCK) < 0) {
+      GlobalOutput.perror("thriftServerEventHandler: set THRIFT_O_NONBLOCK (THRIFT_FCNTL) ", THRIFT_GET_SOCKET_ERROR);
+      ::THRIFT_CLOSESOCKET(clientSocket);
+      return;
+    }
+
+    // Create a new TConnection for this client socket.
+    TConnection* clientConnection =
+      createConnection(clientSocket, addrp, addrLen);
+
+    // Fail fast if we could not create a TConnection object
+    if (clientConnection == NULL) {
+      GlobalOutput.printf("thriftServerEventHandler: failed TConnection factory");
+      ::THRIFT_CLOSESOCKET(clientSocket);
+      return;
+    }
+
+    /*
+     * Either notify the ioThread that is assigned this connection to
+     * start processing, or if it is us, we'll just ask this
+     * connection to do its initial state change here.
+     *
+     * (We need to avoid writing to our own notification pipe, to
+     * avoid possible deadlocks if the pipe is full.)
+     *
+     * The IO thread #0 is the only one that handles these listen
+     * events, so unless the connection has been assigned to thread #0
+     * we know it's not on our thread.
+     */
+    if (clientConnection->getIOThreadNumber() == 0) {
+      clientConnection->transition();
+    } else {
+      clientConnection->notifyIOThread();
+    }
+
+    // addrLen is written by the accept() call, so needs to be set before the next call.
+    addrLen = sizeof(addrStorage);
+  }
+
+
+  // Done looping accept, now we have to make sure the error is due to
+  // blocking. Any other error is a problem
+  if (THRIFT_GET_SOCKET_ERROR != THRIFT_EAGAIN && THRIFT_GET_SOCKET_ERROR != THRIFT_EWOULDBLOCK) {
+    GlobalOutput.perror("thriftServerEventHandler: accept() ", THRIFT_GET_SOCKET_ERROR);
+  }
+}
+
+/**
+ * Creates a socket to listen on and binds it to the local port.
+ */
+void TNonblockingServer::createAndListenOnSocket() {
+  THRIFT_SOCKET s;
+
+  struct addrinfo hints, *res, *res0;
+  int error;
+
+  char port[sizeof("65536") + 1];
+  memset(&hints, 0, sizeof(hints));
+  hints.ai_family = PF_UNSPEC;
+  hints.ai_socktype = SOCK_STREAM;
+  hints.ai_flags = AI_PASSIVE | AI_ADDRCONFIG;
+  sprintf(port, "%d", port_);
+
+  // Wildcard address
+  error = getaddrinfo(NULL, port, &hints, &res0);
+  if (error) {
+    throw TException("TNonblockingServer::serve() getaddrinfo " +
+                     string(THRIFT_GAI_STRERROR(error)));
+  }
+
+  // Pick the ipv6 address first since ipv4 addresses can be mapped
+  // into ipv6 space.
+  for (res = res0; res; res = res->ai_next) {
+    if (res->ai_family == AF_INET6 || res->ai_next == NULL)
+      break;
+  }
+
+  // Create the server socket
+  s = socket(res->ai_family, res->ai_socktype, res->ai_protocol);
+  if (s == -1) {
+    freeaddrinfo(res0);
+    throw TException("TNonblockingServer::serve() socket() -1");
+  }
+
+  #ifdef IPV6_V6ONLY
+  if (res->ai_family == AF_INET6) {
+    int zero = 0;
+    if (-1 == setsockopt(s, IPPROTO_IPV6, IPV6_V6ONLY, const_cast_sockopt(&zero), sizeof(zero))) {
+      GlobalOutput("TServerSocket::listen() IPV6_V6ONLY");
+    }
+  }
+  #endif // #ifdef IPV6_V6ONLY
+
+
+  int one = 1;
+
+  // Set THRIFT_NO_SOCKET_CACHING to avoid 2MSL delay on server restart
+  setsockopt(s, SOL_SOCKET, THRIFT_NO_SOCKET_CACHING, const_cast_sockopt(&one), sizeof(one));
+
+  if (::bind(s, res->ai_addr, static_cast<int>(res->ai_addrlen)) == -1) {
+    ::THRIFT_CLOSESOCKET(s);
+    freeaddrinfo(res0);
+    throw TTransportException(TTransportException::NOT_OPEN,
+                              "TNonblockingServer::serve() bind",
+                              THRIFT_GET_SOCKET_ERROR);
+  }
+
+  // Done with the addr info
+  freeaddrinfo(res0);
+
+  // Set up this file descriptor for listening
+  listenSocket(s);
+}
+
+/**
+ * Takes a socket created by listenSocket() and sets various options on it
+ * to prepare for use in the server.
+ */
+void TNonblockingServer::listenSocket(THRIFT_SOCKET s) {
+  // Set socket to nonblocking mode
+  int flags;
+  if ((flags = THRIFT_FCNTL(s, THRIFT_F_GETFL, 0)) < 0 ||
+      THRIFT_FCNTL(s, THRIFT_F_SETFL, flags | THRIFT_O_NONBLOCK) < 0) {
+    ::THRIFT_CLOSESOCKET(s);
+    throw TException("TNonblockingServer::serve() THRIFT_O_NONBLOCK");
+  }
+
+  int one = 1;
+  struct linger ling = {0, 0};
+
+  // Keepalive to ensure full result flushing
+  setsockopt(s, SOL_SOCKET, SO_KEEPALIVE, const_cast_sockopt(&one), sizeof(one));
+
+  // Turn linger off to avoid hung sockets
+  setsockopt(s, SOL_SOCKET, SO_LINGER, const_cast_sockopt(&ling), sizeof(ling));
+
+  // Set TCP nodelay if available, MAC OS X Hack
+  // See http://lists.danga.com/pipermail/memcached/2005-March/001240.html
+  #ifndef TCP_NOPUSH
+  setsockopt(s, IPPROTO_TCP, TCP_NODELAY, const_cast_sockopt(&one), sizeof(one));
+  #endif
+
+  #ifdef TCP_LOW_MIN_RTO
+  if (TSocket::getUseLowMinRto()) {
+    setsockopt(s, IPPROTO_TCP, TCP_LOW_MIN_RTO, const_cast_sockopt(&one), sizeof(one));
+  }
+  #endif
+
+  if (listen(s, LISTEN_BACKLOG) == -1) {
+    ::THRIFT_CLOSESOCKET(s);
+    throw TException("TNonblockingServer::serve() listen");
+  }
+
+  // Cool, this socket is good to go, set it as the serverSocket_
+  serverSocket_ = s;
+}
+
+void TNonblockingServer::setThreadManager(boost::shared_ptr<ThreadManager> threadManager) {
+  threadManager_ = threadManager;
+  if (threadManager) {
+    threadManager->setExpireCallback(apache::thrift::stdcxx::bind(&TNonblockingServer::expireClose, this, apache::thrift::stdcxx::placeholders::_1));
+    threadPoolProcessing_ = true;
+  } else {
+    threadPoolProcessing_ = false;
+  }
+}
+
+bool  TNonblockingServer::serverOverloaded() {
+  size_t activeConnections = numTConnections_ - connectionStack_.size();
+  if (numActiveProcessors_ > maxActiveProcessors_ ||
+      activeConnections > maxConnections_) {
+    if (!overloaded_) {
+       GlobalOutput.printf("TNonblockingServer: overload condition begun.");
+      overloaded_ = true;
+    }
+  } else {
+    if (overloaded_ &&
+        (numActiveProcessors_ <= overloadHysteresis_ * maxActiveProcessors_) &&
+        (activeConnections <= overloadHysteresis_ * maxConnections_)) {
+      GlobalOutput.printf("TNonblockingServer: overload ended; "
+                          "%u dropped (%llu total)",
+                          nConnectionsDropped_, nTotalConnectionsDropped_);
+      nConnectionsDropped_ = 0;
+      overloaded_ = false;
+    }
+  }
+
+  return overloaded_;
+}
+
+bool TNonblockingServer::drainPendingTask() {
+  if (threadManager_) {
+    boost::shared_ptr<Runnable> task = threadManager_->removeNextPending();
+    if (task) {
+      TConnection* connection =
+        static_cast<TConnection::Task*>(task.get())->getTConnection();
+      assert(connection && connection->getServer()
+             && connection->getState() == APP_WAIT_TASK);
+      connection->forceClose();
+      return true;
+    }
+  }
+  return false;
+}
+
+void TNonblockingServer::expireClose(boost::shared_ptr<Runnable> task) {
+  TConnection* connection =
+    static_cast<TConnection::Task*>(task.get())->getTConnection();
+  assert(connection && connection->getServer() &&
+         connection->getState() == APP_WAIT_TASK);
+  connection->forceClose();
+}
+
+void TNonblockingServer::stop() {
+  // Breaks the event loop in all threads so that they end ASAP.
+  for (uint32_t i = 0; i < ioThreads_.size(); ++i) {
+    ioThreads_[i]->stop();
+  }
+}
+
+void TNonblockingServer::registerEvents(event_base* user_event_base) {
+  userEventBase_ = user_event_base;
+
+  // init listen socket
+  if (serverSocket_ == THRIFT_INVALID_SOCKET)
+    createAndListenOnSocket();
+
+  // set up the IO threads
+  assert(ioThreads_.empty());
+  if (!numIOThreads_) {
+    numIOThreads_ = DEFAULT_IO_THREADS;
+  }
+
+  for (uint32_t id = 0; id < numIOThreads_; ++id) {
+    // the first IO thread also does the listening on server socket
+    THRIFT_SOCKET listenFd = (id == 0 ? serverSocket_ : THRIFT_INVALID_SOCKET);
+
+    shared_ptr<TNonblockingIOThread> thread(
+      new TNonblockingIOThread(this, id, listenFd, useHighPriorityIOThreads_));
+    ioThreads_.push_back(thread);
+  }
+
+  // Notify handler of the preServe event
+  if (eventHandler_) {
+    eventHandler_->preServe();
+  }
+
+  // Start all of our helper IO threads. Note that the threads run forever,
+  // only terminating if stop() is called.
+  assert(ioThreads_.size() == numIOThreads_);
+  assert(ioThreads_.size() > 0);
+
+  GlobalOutput.printf("TNonblockingServer: Serving on port %d, %d io threads.",
+               port_, ioThreads_.size());
+
+  // Launch all the secondary IO threads in separate threads
+  if (ioThreads_.size() > 1) {
+    ioThreadFactory_.reset(new PlatformThreadFactory(
+#if !defined(USE_BOOST_THREAD) && !defined(USE_STD_THREAD)
+      PlatformThreadFactory::OTHER,  // scheduler
+      PlatformThreadFactory::NORMAL, // priority
+      1,                          // stack size (MB)
+#endif
+      false                       // detached
+    ));
+
+    assert(ioThreadFactory_.get());
+
+    // intentionally starting at thread 1, not 0
+    for (uint32_t i = 1; i < ioThreads_.size(); ++i) {
+      shared_ptr<Thread> thread = ioThreadFactory_->newThread(ioThreads_[i]);
+      ioThreads_[i]->setThread(thread);
+      thread->start();
+    }
+  }
+
+  // Register the events for the primary (listener) IO thread
+  ioThreads_[0]->registerEvents();
+}
+
+/**
+ * Main workhorse function, starts up the server listening on a port and
+ * loops over the libevent handler.
+ */
+void TNonblockingServer::serve() {
+
+  registerEvents(NULL);
+
+  // Run the primary (listener) IO thread loop in our main thread; this will
+  // only return when the server is shutting down.
+  ioThreads_[0]->run();
+
+  // Ensure all threads are finished before exiting serve()
+  for (uint32_t i = 0; i < ioThreads_.size(); ++i) {
+    ioThreads_[i]->join();
+    GlobalOutput.printf("TNonblocking: join done for IO thread #%d", i);
+  }
+}
+
+TNonblockingIOThread::TNonblockingIOThread(TNonblockingServer* server,
+                                           int number,
+                                           THRIFT_SOCKET listenSocket,
+                                           bool useHighPriority)
+      : server_(server)
+      , number_(number)
+      , listenSocket_(listenSocket)
+      , useHighPriority_(useHighPriority)
+      , eventBase_(NULL)
+      , ownEventBase_(false) {
+  notificationPipeFDs_[0] = -1;
+  notificationPipeFDs_[1] = -1;
+}
+
+TNonblockingIOThread::~TNonblockingIOThread() {
+  // make sure our associated thread is fully finished
+  join();
+
+  if (eventBase_ && ownEventBase_) {
+    event_base_free(eventBase_);
+    ownEventBase_ = false;
+  }
+
+  if (listenSocket_ >= 0) {
+    if (0 != ::THRIFT_CLOSESOCKET(listenSocket_)) {
+      GlobalOutput.perror("TNonblockingIOThread listenSocket_ close(): ",
+                          THRIFT_GET_SOCKET_ERROR);
+    }
+    listenSocket_ = THRIFT_INVALID_SOCKET;
+  }
+
+  for (int i = 0; i < 2; ++i) {
+    if (notificationPipeFDs_[i] >= 0) {
+      if (0 != ::THRIFT_CLOSESOCKET(notificationPipeFDs_[i])) {
+        GlobalOutput.perror("TNonblockingIOThread notificationPipe close(): ",
+                            THRIFT_GET_SOCKET_ERROR);
+      }
+      notificationPipeFDs_[i] = THRIFT_INVALID_SOCKET;
+    }
+  }
+}
+
+void TNonblockingIOThread::createNotificationPipe() {
+  if(evutil_socketpair(AF_LOCAL, SOCK_STREAM, 0, notificationPipeFDs_) == -1) {
+    GlobalOutput.perror("TNonblockingServer::createNotificationPipe ", EVUTIL_SOCKET_ERROR());
+    throw TException("can't create notification pipe");
+  }
+  if(evutil_make_socket_nonblocking(notificationPipeFDs_[0])<0 ||
+     evutil_make_socket_nonblocking(notificationPipeFDs_[1])<0) {
+    ::THRIFT_CLOSESOCKET(notificationPipeFDs_[0]);
+    ::THRIFT_CLOSESOCKET(notificationPipeFDs_[1]);
+    throw TException("TNonblockingServer::createNotificationPipe() THRIFT_O_NONBLOCK");
+  }
+  for (int i = 0; i < 2; ++i) {
+#if LIBEVENT_VERSION_NUMBER < 0x02000000
+    int flags;
+    if ((flags = THRIFT_FCNTL(notificationPipeFDs_[i], F_GETFD, 0)) < 0 ||
+        THRIFT_FCNTL(notificationPipeFDs_[i], F_SETFD, flags | FD_CLOEXEC) < 0) {
+#else
+    if (evutil_make_socket_closeonexec(notificationPipeFDs_[i]) < 0) {
+#endif
+      ::THRIFT_CLOSESOCKET(notificationPipeFDs_[0]);
+      ::THRIFT_CLOSESOCKET(notificationPipeFDs_[1]);
+      throw TException("TNonblockingServer::createNotificationPipe() "
+        "FD_CLOEXEC");
+    }
+  }
+}
+
+/**
+ * Register the core libevent events onto the proper base.
+ */
+void TNonblockingIOThread::registerEvents() {
+  threadId_ = Thread::get_current();
+
+  assert(eventBase_ == 0);
+  eventBase_ = getServer()->getUserEventBase();
+  if (eventBase_ == NULL) {
+    eventBase_ = event_base_new();
+    ownEventBase_ = true;
+  }
+
+  // Print some libevent stats
+  if (number_ == 0) {
+    GlobalOutput.printf("TNonblockingServer: using libevent %s method %s",
+            event_get_version(),
+            event_base_get_method(eventBase_));
+  }
+
+  if (listenSocket_ >= 0) {
+    // Register the server event
+    event_set(&serverEvent_,
+              listenSocket_,
+              EV_READ | EV_PERSIST,
+              TNonblockingIOThread::listenHandler,
+              server_);
+    event_base_set(eventBase_, &serverEvent_);
+
+    // Add the event and start up the server
+    if (-1 == event_add(&serverEvent_, 0)) {
+      throw TException("TNonblockingServer::serve(): "
+                       "event_add() failed on server listen event");
+    }
+    GlobalOutput.printf("TNonblocking: IO thread #%d registered for listen.",
+                        number_);
+  }
+
+  createNotificationPipe();
+
+  // Create an event to be notified when a task finishes
+  event_set(&notificationEvent_,
+            getNotificationRecvFD(),
+            EV_READ | EV_PERSIST,
+            TNonblockingIOThread::notifyHandler,
+            this);
+
+  // Attach to the base
+  event_base_set(eventBase_, &notificationEvent_);
+
+  // Add the event and start up the server
+  if (-1 == event_add(&notificationEvent_, 0)) {
+    throw TException("TNonblockingServer::serve(): "
+                     "event_add() failed on task-done notification event");
+  }
+  GlobalOutput.printf("TNonblocking: IO thread #%d registered for notify.",
+                      number_);
+}
+
+bool TNonblockingIOThread::notify(TNonblockingServer::TConnection* conn) {
+  THRIFT_SOCKET fd = getNotificationSendFD();
+  if (fd < 0) {
+    return false;
+  }
+
+  const int kSize = sizeof(conn);
+  if (send(fd, const_cast_sockopt(&conn), kSize, 0) != kSize) {
+    return false;
+  }
+
+  return true;
+}
+
+/* static */
+void TNonblockingIOThread::notifyHandler(evutil_socket_t fd, short which, void* v) {
+  TNonblockingIOThread* ioThread = (TNonblockingIOThread*) v;
+  assert(ioThread);
+  (void)which;
+
+  while (true) {
+    TNonblockingServer::TConnection* connection = 0;
+    const int kSize = sizeof(connection);
+    int nBytes = recv(fd, cast_sockopt(&connection), kSize, 0);
+    if (nBytes == kSize) {
+      if (connection == NULL) {
+        // this is the command to stop our thread, exit the handler!
+        return;
+      }
+      connection->transition();
+    } else if (nBytes > 0) {
+      // throw away these bytes and hope that next time we get a solid read
+      GlobalOutput.printf("notifyHandler: Bad read of %d bytes, wanted %d",
+                          nBytes, kSize);
+      ioThread->breakLoop(true);
+      return;
+    } else if (nBytes == 0) {
+      GlobalOutput.printf("notifyHandler: Notify socket closed!");
+      // exit the loop
+      break;
+    } else { // nBytes < 0
+      if (THRIFT_GET_SOCKET_ERROR != THRIFT_EWOULDBLOCK && THRIFT_GET_SOCKET_ERROR != THRIFT_EAGAIN) {
+          GlobalOutput.perror(
+            "TNonblocking: notifyHandler read() failed: ", THRIFT_GET_SOCKET_ERROR);
+          ioThread->breakLoop(true);
+          return;
+      }
+      // exit the loop
+      break;
+    }
+  }
+}
+
+void TNonblockingIOThread::breakLoop(bool error) {
+  if (error) {
+    GlobalOutput.printf(
+      "TNonblockingServer: IO thread #%d exiting with error.", number_);
+    // TODO: figure out something better to do here, but for now kill the
+    // whole process.
+    GlobalOutput.printf("TNonblockingServer: aborting process.");
+    ::abort();
+  }
+
+  // sets a flag so that the loop exits on the next event
+  event_base_loopbreak(eventBase_);
+
+  // event_base_loopbreak() only causes the loop to exit the next time
+  // it wakes up.  We need to force it to wake up, in case there are
+  // no real events it needs to process.
+  //
+  // If we're running in the same thread, we can't use the notify(0)
+  // mechanism to stop the thread, but happily if we're running in the
+  // same thread, this means the thread can't be blocking in the event
+  // loop either.
+  if (!Thread::is_current(threadId_)) {
+    notify(NULL);
+  }
+}
+
+void TNonblockingIOThread::setCurrentThreadHighPriority(bool value) {
+#ifdef HAVE_SCHED_H
+  // Start out with a standard, low-priority setup for the sched params.
+  struct sched_param sp;
+  bzero((void*) &sp, sizeof(sp));
+  int policy = SCHED_OTHER;
+
+  // If desired, set up high-priority sched params structure.
+  if (value) {
+    // FIFO scheduler, ranked above default SCHED_OTHER queue
+    policy = SCHED_FIFO;
+    // The priority only compares us to other SCHED_FIFO threads, so we
+    // just pick a random priority halfway between min & max.
+    const int priority = (sched_get_priority_max(policy) +
+                          sched_get_priority_min(policy)) / 2;
+
+    sp.sched_priority = priority;
+  }
+
+  // Actually set the sched params for the current thread.
+  if (0 == pthread_setschedparam(pthread_self(), policy, &sp)) {
+    GlobalOutput.printf(
+      "TNonblocking: IO Thread #%d using high-priority scheduler!", number_);
+  } else {
+    GlobalOutput.perror("TNonblocking: pthread_setschedparam(): ", THRIFT_GET_SOCKET_ERROR);
+  }
+#else
+  THRIFT_UNUSED_VARIABLE(value);
+#endif
+}
+
+void TNonblockingIOThread::run() {
+  if (eventBase_ == NULL)
+    registerEvents();
+
+  GlobalOutput.printf("TNonblockingServer: IO thread #%d entering loop...",
+                      number_);
+
+  if (useHighPriority_) {
+    setCurrentThreadHighPriority(true);
+  }
+
+  // Run libevent engine, never returns, invokes calls to eventHandler
+  event_base_loop(eventBase_, 0);
+
+  if (useHighPriority_) {
+    setCurrentThreadHighPriority(false);
+  }
+
+  // cleans up our registered events
+  cleanupEvents();
+
+  GlobalOutput.printf("TNonblockingServer: IO thread #%d run() done!",
+    number_);
+}
+
+void TNonblockingIOThread::cleanupEvents() {
+  // stop the listen socket, if any
+  if (listenSocket_ >= 0) {
+    if (event_del(&serverEvent_) == -1) {
+      GlobalOutput.perror("TNonblockingIOThread::stop() event_del: ", THRIFT_GET_SOCKET_ERROR);
+    }
+  }
+
+  event_del(&notificationEvent_);
+}
+
+
+void TNonblockingIOThread::stop() {
+  // This should cause the thread to fall out of its event loop ASAP.
+  breakLoop(false);
+}
+
+void TNonblockingIOThread::join() {
+  // If this was a thread created by a factory (not the thread that called
+  // serve()), we join() it to make sure we shut down fully.
+  if (thread_) {
+    try {
+      // Note that it is safe to both join() ourselves twice, as well as join
+      // the current thread as the pthread implementation checks for deadlock.
+      thread_->join();
+    } catch(...) {
+      // swallow everything
+    }
+  }
+}
+
+}}} // apache::thrift::server

http://git-wip-us.apache.org/repos/asf/airavata/blob/f891b7dc/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/thrift/server/TNonblockingServer.h
----------------------------------------------------------------------
diff --git a/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/thrift/server/TNonblockingServer.h b/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/thrift/server/TNonblockingServer.h
new file mode 100644
index 0000000..532d4ae
--- /dev/null
+++ b/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/thrift/server/TNonblockingServer.h
@@ -0,0 +1,944 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#ifndef _THRIFT_SERVER_TNONBLOCKINGSERVER_H_
+#define _THRIFT_SERVER_TNONBLOCKINGSERVER_H_ 1
+
+#include <thrift/Thrift.h>
+#include <thrift/server/TServer.h>
+#include <thrift/transport/PlatformSocket.h>
+#include <thrift/transport/TBufferTransports.h>
+#include <thrift/transport/TSocket.h>
+#include <thrift/concurrency/ThreadManager.h>
+#include <climits>
+#include <thrift/concurrency/Thread.h>
+#include <thrift/concurrency/PlatformThreadFactory.h>
+#include <thrift/concurrency/Mutex.h>
+#include <stack>
+#include <vector>
+#include <string>
+#include <cstdlib>
+#ifdef HAVE_UNISTD_H
+#include <unistd.h>
+#endif
+#include <event.h>
+
+
+
+namespace apache { namespace thrift { namespace server {
+
+using apache::thrift::transport::TMemoryBuffer;
+using apache::thrift::transport::TSocket;
+using apache::thrift::protocol::TProtocol;
+using apache::thrift::concurrency::Runnable;
+using apache::thrift::concurrency::ThreadManager;
+using apache::thrift::concurrency::PlatformThreadFactory;
+using apache::thrift::concurrency::ThreadFactory;
+using apache::thrift::concurrency::Thread;
+using apache::thrift::concurrency::Mutex;
+using apache::thrift::concurrency::Guard;
+
+#ifdef LIBEVENT_VERSION_NUMBER
+#define LIBEVENT_VERSION_MAJOR (LIBEVENT_VERSION_NUMBER >> 24)
+#define LIBEVENT_VERSION_MINOR ((LIBEVENT_VERSION_NUMBER >> 16) & 0xFF)
+#define LIBEVENT_VERSION_REL ((LIBEVENT_VERSION_NUMBER >> 8) & 0xFF)
+#else
+// assume latest version 1 series
+#define LIBEVENT_VERSION_MAJOR 1
+#define LIBEVENT_VERSION_MINOR 14
+#define LIBEVENT_VERSION_REL 13
+#define LIBEVENT_VERSION_NUMBER ((LIBEVENT_VERSION_MAJOR << 24) | (LIBEVENT_VERSION_MINOR << 16) | (LIBEVENT_VERSION_REL << 8))
+#endif
+
+#if LIBEVENT_VERSION_NUMBER < 0x02000000
+ typedef THRIFT_SOCKET evutil_socket_t;
+#endif
+
+#ifndef SOCKOPT_CAST_T
+#   ifndef _WIN32
+#       define SOCKOPT_CAST_T void
+#   else
+#       define SOCKOPT_CAST_T char
+#   endif // _WIN32
+#endif
+
+template<class T>
+inline const SOCKOPT_CAST_T* const_cast_sockopt(const T* v) {
+  return reinterpret_cast<const SOCKOPT_CAST_T*>(v);
+}
+
+template<class T>
+inline SOCKOPT_CAST_T* cast_sockopt(T* v) {
+  return reinterpret_cast<SOCKOPT_CAST_T*>(v);
+}
+
+/**
+ * This is a non-blocking server in C++ for high performance that
+ * operates a set of IO threads (by default only one). It assumes that
+ * all incoming requests are framed with a 4 byte length indicator and
+ * writes out responses using the same framing.
+ *
+ * It does not use the TServerTransport framework, but rather has socket
+ * operations hardcoded for use with select.
+ *
+ */
+
+
+/// Overload condition actions.
+enum TOverloadAction {
+  T_OVERLOAD_NO_ACTION,        ///< Don't handle overload */
+  T_OVERLOAD_CLOSE_ON_ACCEPT,  ///< Drop new connections immediately */
+  T_OVERLOAD_DRAIN_TASK_QUEUE  ///< Drop some tasks from head of task queue */
+};
+
+class TNonblockingIOThread;
+
+class TNonblockingServer : public TServer {
+ private:
+  class TConnection;
+
+  friend class TNonblockingIOThread;
+ private:
+  /// Listen backlog
+  static const int LISTEN_BACKLOG = 1024;
+
+  /// Default limit on size of idle connection pool
+  static const size_t CONNECTION_STACK_LIMIT = 1024;
+
+  /// Default limit on frame size
+  static const int MAX_FRAME_SIZE = 256 * 1024 * 1024;
+
+  /// Default limit on total number of connected sockets
+  static const int MAX_CONNECTIONS = INT_MAX;
+
+  /// Default limit on connections in handler/task processing
+  static const int MAX_ACTIVE_PROCESSORS = INT_MAX;
+
+  /// Default size of write buffer
+  static const int WRITE_BUFFER_DEFAULT_SIZE = 1024;
+
+  /// Maximum size of read buffer allocated to idle connection (0 = unlimited)
+  static const int IDLE_READ_BUFFER_LIMIT = 1024;
+
+  /// Maximum size of write buffer allocated to idle connection (0 = unlimited)
+  static const int IDLE_WRITE_BUFFER_LIMIT = 1024;
+
+  /// # of calls before resizing oversized buffers (0 = check only on close)
+  static const int RESIZE_BUFFER_EVERY_N = 512;
+
+  /// # of IO threads to use by default
+  static const int DEFAULT_IO_THREADS = 1;
+
+  /// # of IO threads this server will use
+  size_t numIOThreads_;
+
+  /// Whether to set high scheduling priority for IO threads
+  bool useHighPriorityIOThreads_;
+
+  /// Server socket file descriptor
+  THRIFT_SOCKET serverSocket_;
+
+  /// Port server runs on
+  int port_;
+
+  /// The optional user-provided event-base (for single-thread servers)
+  event_base* userEventBase_;
+
+  /// For processing via thread pool, may be NULL
+  boost::shared_ptr<ThreadManager> threadManager_;
+
+  /// Is thread pool processing?
+  bool threadPoolProcessing_;
+
+  // Factory to create the IO threads
+  boost::shared_ptr<PlatformThreadFactory> ioThreadFactory_;
+
+  // Vector of IOThread objects that will handle our IO
+  std::vector<boost::shared_ptr<TNonblockingIOThread> > ioThreads_;
+
+  // Index of next IO Thread to be used (for round-robin)
+  uint32_t nextIOThread_;
+
+  // Synchronizes access to connection stack and similar data
+  Mutex connMutex_;
+
+  /// Number of TConnection object we've created
+  size_t numTConnections_;
+
+  /// Number of Connections processing or waiting to process
+  size_t numActiveProcessors_;
+
+  /// Limit for how many TConnection objects to cache
+  size_t connectionStackLimit_;
+
+  /// Limit for number of connections processing or waiting to process
+  size_t maxActiveProcessors_;
+
+  /// Limit for number of open connections
+  size_t maxConnections_;
+
+  /// Limit for frame size
+  size_t maxFrameSize_;
+
+  /// Time in milliseconds before an unperformed task expires (0 == infinite).
+  int64_t taskExpireTime_;
+
+  /**
+   * Hysteresis for overload state.  This is the fraction of the overload
+   * value that needs to be reached before the overload state is cleared;
+   * must be <= 1.0.
+   */
+  double overloadHysteresis_;
+
+  /// Action to take when we're overloaded.
+  TOverloadAction overloadAction_;
+
+  /**
+   * The write buffer is initialized (and when idleWriteBufferLimit_ is checked
+   * and found to be exceeded, reinitialized) to this size.
+   */
+  size_t writeBufferDefaultSize_;
+
+  /**
+   * Max read buffer size for an idle TConnection.  When we place an idle
+   * TConnection into connectionStack_ or on every resizeBufferEveryN_ calls,
+   * we will free the buffer (such that it will be reinitialized by the next
+   * received frame) if it has exceeded this limit.  0 disables this check.
+   */
+  size_t idleReadBufferLimit_;
+
+  /**
+   * Max write buffer size for an idle connection.  When we place an idle
+   * TConnection into connectionStack_ or on every resizeBufferEveryN_ calls,
+   * we insure that its write buffer is <= to this size; otherwise we
+   * replace it with a new one of writeBufferDefaultSize_ bytes to insure that
+   * idle connections don't hog memory. 0 disables this check.
+   */
+  size_t idleWriteBufferLimit_;
+
+  /**
+   * Every N calls we check the buffer size limits on a connected TConnection.
+   * 0 disables (i.e. the checks are only done when a connection closes).
+   */
+  int32_t resizeBufferEveryN_;
+
+  /// Set if we are currently in an overloaded state.
+  bool overloaded_;
+
+  /// Count of connections dropped since overload started
+  uint32_t nConnectionsDropped_;
+
+  /// Count of connections dropped on overload since server started
+  uint64_t nTotalConnectionsDropped_;
+
+  /**
+   * This is a stack of all the objects that have been created but that
+   * are NOT currently in use. When we close a connection, we place it on this
+   * stack so that the object can be reused later, rather than freeing the
+   * memory and reallocating a new object later.
+   */
+  std::stack<TConnection*> connectionStack_;
+
+  /**
+   * This container holds pointers to all active connections. This container
+   * allows the server to clean up unlcosed connection objects at destruction,
+   * which in turn allows their transports, protocols, processors and handlers
+   * to deallocate and clean up correctly.
+   */
+  std::vector<TConnection*> activeConnections_;
+
+  /**
+   * Called when server socket had something happen.  We accept all waiting
+   * client connections on listen socket fd and assign TConnection objects
+   * to handle those requests.
+   *
+   * @param fd the listen socket.
+   * @param which the event flag that triggered the handler.
+   */
+  void handleEvent(THRIFT_SOCKET fd, short which);
+
+  void init(int port) {
+    serverSocket_ = THRIFT_INVALID_SOCKET;
+    numIOThreads_ = DEFAULT_IO_THREADS;
+    nextIOThread_ = 0;
+    useHighPriorityIOThreads_ = false;
+    port_ = port;
+    userEventBase_ = NULL;
+    threadPoolProcessing_ = false;
+    numTConnections_ = 0;
+    numActiveProcessors_ = 0;
+    connectionStackLimit_ = CONNECTION_STACK_LIMIT;
+    maxActiveProcessors_ = MAX_ACTIVE_PROCESSORS;
+    maxConnections_ = MAX_CONNECTIONS;
+    maxFrameSize_ = MAX_FRAME_SIZE;
+    taskExpireTime_ = 0;
+    overloadHysteresis_ = 0.8;
+    overloadAction_ = T_OVERLOAD_NO_ACTION;
+    writeBufferDefaultSize_ = WRITE_BUFFER_DEFAULT_SIZE;
+    idleReadBufferLimit_ = IDLE_READ_BUFFER_LIMIT;
+    idleWriteBufferLimit_ = IDLE_WRITE_BUFFER_LIMIT;
+    resizeBufferEveryN_ = RESIZE_BUFFER_EVERY_N;
+    overloaded_ = false;
+    nConnectionsDropped_ = 0;
+    nTotalConnectionsDropped_ = 0;
+  }
+
+ public:
+  template<typename ProcessorFactory>
+  TNonblockingServer(
+      const boost::shared_ptr<ProcessorFactory>& processorFactory,
+      int port,
+      THRIFT_OVERLOAD_IF(ProcessorFactory, TProcessorFactory)) :
+    TServer(processorFactory) {
+    init(port);
+  }
+
+  template<typename Processor>
+  TNonblockingServer(const boost::shared_ptr<Processor>& processor,
+                     int port,
+                     THRIFT_OVERLOAD_IF(Processor, TProcessor)) :
+    TServer(processor) {
+    init(port);
+  }
+
+  template<typename ProcessorFactory>
+  TNonblockingServer(
+      const boost::shared_ptr<ProcessorFactory>& processorFactory,
+      const boost::shared_ptr<TProtocolFactory>& protocolFactory,
+      int port,
+      const boost::shared_ptr<ThreadManager>& threadManager =
+        boost::shared_ptr<ThreadManager>(),
+      THRIFT_OVERLOAD_IF(ProcessorFactory, TProcessorFactory)) :
+    TServer(processorFactory) {
+
+    init(port);
+
+    setInputProtocolFactory(protocolFactory);
+    setOutputProtocolFactory(protocolFactory);
+    setThreadManager(threadManager);
+  }
+
+  template<typename Processor>
+  TNonblockingServer(
+      const boost::shared_ptr<Processor>& processor,
+      const boost::shared_ptr<TProtocolFactory>& protocolFactory,
+      int port,
+      const boost::shared_ptr<ThreadManager>& threadManager =
+        boost::shared_ptr<ThreadManager>(),
+      THRIFT_OVERLOAD_IF(Processor, TProcessor)) :
+    TServer(processor) {
+
+    init(port);
+
+    setInputProtocolFactory(protocolFactory);
+    setOutputProtocolFactory(protocolFactory);
+    setThreadManager(threadManager);
+  }
+
+  template<typename ProcessorFactory>
+  TNonblockingServer(
+      const boost::shared_ptr<ProcessorFactory>& processorFactory,
+      const boost::shared_ptr<TTransportFactory>& inputTransportFactory,
+      const boost::shared_ptr<TTransportFactory>& outputTransportFactory,
+      const boost::shared_ptr<TProtocolFactory>& inputProtocolFactory,
+      const boost::shared_ptr<TProtocolFactory>& outputProtocolFactory,
+      int port,
+      const boost::shared_ptr<ThreadManager>& threadManager =
+        boost::shared_ptr<ThreadManager>(),
+      THRIFT_OVERLOAD_IF(ProcessorFactory, TProcessorFactory)) :
+    TServer(processorFactory) {
+
+    init(port);
+
+    setInputTransportFactory(inputTransportFactory);
+    setOutputTransportFactory(outputTransportFactory);
+    setInputProtocolFactory(inputProtocolFactory);
+    setOutputProtocolFactory(outputProtocolFactory);
+    setThreadManager(threadManager);
+  }
+
+  template<typename Processor>
+  TNonblockingServer(
+      const boost::shared_ptr<Processor>& processor,
+      const boost::shared_ptr<TTransportFactory>& inputTransportFactory,
+      const boost::shared_ptr<TTransportFactory>& outputTransportFactory,
+      const boost::shared_ptr<TProtocolFactory>& inputProtocolFactory,
+      const boost::shared_ptr<TProtocolFactory>& outputProtocolFactory,
+      int port,
+      const boost::shared_ptr<ThreadManager>& threadManager =
+        boost::shared_ptr<ThreadManager>(),
+      THRIFT_OVERLOAD_IF(Processor, TProcessor)) :
+    TServer(processor) {
+
+    init(port);
+
+    setInputTransportFactory(inputTransportFactory);
+    setOutputTransportFactory(outputTransportFactory);
+    setInputProtocolFactory(inputProtocolFactory);
+    setOutputProtocolFactory(outputProtocolFactory);
+    setThreadManager(threadManager);
+  }
+
+  ~TNonblockingServer();
+
+  void setThreadManager(boost::shared_ptr<ThreadManager> threadManager);
+
+  boost::shared_ptr<ThreadManager> getThreadManager() {
+    return threadManager_;
+  }
+
+  /**
+   * Sets the number of IO threads used by this server. Can only be used before
+   * the call to serve() and has no effect afterwards.  We always use a
+   * PosixThreadFactory for the IO worker threads, because they must joinable
+   * for clean shutdown.
+   */
+  void setNumIOThreads(size_t numThreads) {
+    numIOThreads_ = numThreads;
+  }
+
+  /** Return whether the IO threads will get high scheduling priority */
+  bool useHighPriorityIOThreads() const {
+    return useHighPriorityIOThreads_;
+  }
+
+  /** Set whether the IO threads will get high scheduling priority. */
+  void setUseHighPriorityIOThreads(bool val) {
+    useHighPriorityIOThreads_ = val;
+  }
+
+  /** Return the number of IO threads used by this server. */
+  size_t getNumIOThreads() const {
+    return numIOThreads_;
+  }
+
+  /**
+   * Get the maximum number of unused TConnection we will hold in reserve.
+   *
+   * @return the current limit on TConnection pool size.
+   */
+  size_t getConnectionStackLimit() const {
+    return connectionStackLimit_;
+  }
+
+  /**
+   * Set the maximum number of unused TConnection we will hold in reserve.
+   *
+   * @param sz the new limit for TConnection pool size.
+   */
+  void setConnectionStackLimit(size_t sz) {
+    connectionStackLimit_ = sz;
+  }
+
+  bool isThreadPoolProcessing() const {
+    return threadPoolProcessing_;
+  }
+
+  void addTask(boost::shared_ptr<Runnable> task) {
+    threadManager_->add(task, 0LL, taskExpireTime_);
+  }
+
+  /**
+   * Return the count of sockets currently connected to.
+   *
+   * @return count of connected sockets.
+   */
+  size_t getNumConnections() const {
+    return numTConnections_;
+  }
+
+  /**
+   * Return the count of sockets currently connected to.
+   *
+   * @return count of connected sockets.
+   */
+  size_t getNumActiveConnections() const {
+    return getNumConnections() - getNumIdleConnections();
+  }
+
+  /**
+   * Return the count of connection objects allocated but not in use.
+   *
+   * @return count of idle connection objects.
+   */
+  size_t getNumIdleConnections() const {
+    return connectionStack_.size();
+  }
+
+  /**
+   * Return count of number of connections which are currently processing.
+   * This is defined as a connection where all data has been received and
+   * either assigned a task (when threading) or passed to a handler (when
+   * not threading), and where the handler has not yet returned.
+   *
+   * @return # of connections currently processing.
+   */
+  size_t getNumActiveProcessors() const {
+    return numActiveProcessors_;
+  }
+
+  /// Increment the count of connections currently processing.
+  void incrementActiveProcessors() {
+    Guard g(connMutex_);
+    ++numActiveProcessors_;
+  }
+
+  /// Decrement the count of connections currently processing.
+  void decrementActiveProcessors() {
+    Guard g(connMutex_);
+    if (numActiveProcessors_ > 0) {
+      --numActiveProcessors_;
+    }
+  }
+
+  /**
+   * Get the maximum # of connections allowed before overload.
+   *
+   * @return current setting.
+   */
+  size_t getMaxConnections() const {
+    return maxConnections_;
+  }
+
+  /**
+   * Set the maximum # of connections allowed before overload.
+   *
+   * @param maxConnections new setting for maximum # of connections.
+   */
+  void setMaxConnections(size_t maxConnections) {
+    maxConnections_ = maxConnections;
+  }
+
+  /**
+   * Get the maximum # of connections waiting in handler/task before overload.
+   *
+   * @return current setting.
+   */
+  size_t getMaxActiveProcessors() const {
+    return maxActiveProcessors_;
+  }
+
+  /**
+   * Set the maximum # of connections waiting in handler/task before overload.
+   *
+   * @param maxActiveProcessors new setting for maximum # of active processes.
+   */
+  void setMaxActiveProcessors(size_t maxActiveProcessors) {
+    maxActiveProcessors_ = maxActiveProcessors;
+  }
+
+  /**
+   * Get the maximum allowed frame size.
+   *
+   * If a client tries to send a message larger than this limit,
+   * its connection will be closed.
+   *
+   * @return Maxium frame size, in bytes.
+   */
+  size_t getMaxFrameSize() const {
+    return maxFrameSize_;
+  }
+
+  /**
+   * Set the maximum allowed frame size.
+   *
+   * @param maxFrameSize The new maximum frame size.
+   */
+  void setMaxFrameSize(size_t maxFrameSize) {
+    maxFrameSize_ = maxFrameSize;
+  }
+
+  /**
+   * Get fraction of maximum limits before an overload condition is cleared.
+   *
+   * @return hysteresis fraction
+   */
+  double getOverloadHysteresis() const {
+    return overloadHysteresis_;
+  }
+
+  /**
+   * Set fraction of maximum limits before an overload condition is cleared.
+   * A good value would probably be between 0.5 and 0.9.
+   *
+   * @param hysteresisFraction fraction <= 1.0.
+   */
+  void setOverloadHysteresis(double hysteresisFraction) {
+    if (hysteresisFraction <= 1.0 && hysteresisFraction > 0.0) {
+      overloadHysteresis_ = hysteresisFraction;
+    }
+  }
+
+  /**
+   * Get the action the server will take on overload.
+   *
+   * @return a TOverloadAction enum value for the currently set action.
+   */
+  TOverloadAction getOverloadAction() const {
+    return overloadAction_;
+  }
+
+  /**
+   * Set the action the server is to take on overload.
+   *
+   * @param overloadAction a TOverloadAction enum value for the action.
+   */
+  void setOverloadAction(TOverloadAction overloadAction) {
+    overloadAction_ = overloadAction;
+  }
+
+  /**
+   * Get the time in milliseconds after which a task expires (0 == infinite).
+   *
+   * @return a 64-bit time in milliseconds.
+   */
+  int64_t getTaskExpireTime() const {
+    return taskExpireTime_;
+  }
+
+  /**
+   * Set the time in milliseconds after which a task expires (0 == infinite).
+   *
+   * @param taskExpireTime a 64-bit time in milliseconds.
+   */
+  void setTaskExpireTime(int64_t taskExpireTime) {
+    taskExpireTime_ = taskExpireTime;
+  }
+
+  /**
+   * Determine if the server is currently overloaded.
+   * This function checks the maximums for open connections and connections
+   * currently in processing, and sets an overload condition if they are
+   * exceeded.  The overload will persist until both values are below the
+   * current hysteresis fraction of their maximums.
+   *
+   * @return true if an overload condition exists, false if not.
+   */
+  bool serverOverloaded();
+
+  /** Pop and discard next task on threadpool wait queue.
+   *
+   * @return true if a task was discarded, false if the wait queue was empty.
+   */
+  bool drainPendingTask();
+
+  /**
+   * Get the starting size of a TConnection object's write buffer.
+   *
+   * @return # bytes we initialize a TConnection object's write buffer to.
+   */
+  size_t getWriteBufferDefaultSize() const {
+    return writeBufferDefaultSize_;
+  }
+
+  /**
+   * Set the starting size of a TConnection object's write buffer.
+   *
+   * @param size # bytes we initialize a TConnection object's write buffer to.
+   */
+  void setWriteBufferDefaultSize(size_t size) {
+    writeBufferDefaultSize_ = size;
+  }
+
+  /**
+   * Get the maximum size of read buffer allocated to idle TConnection objects.
+   *
+   * @return # bytes beyond which we will dealloc idle buffer.
+   */
+  size_t getIdleReadBufferLimit() const {
+    return idleReadBufferLimit_;
+  }
+
+  /**
+   * [NOTE: This is for backwards compatibility, use getIdleReadBufferLimit().]
+   * Get the maximum size of read buffer allocated to idle TConnection objects.
+   *
+   * @return # bytes beyond which we will dealloc idle buffer.
+   */
+  size_t getIdleBufferMemLimit() const {
+    return idleReadBufferLimit_;
+  }
+
+  /**
+   * Set the maximum size read buffer allocated to idle TConnection objects.
+   * If a TConnection object is found (either on connection close or between
+   * calls when resizeBufferEveryN_ is set) with more than this much memory
+   * allocated to its read buffer, we free it and allow it to be reinitialized
+   * on the next received frame.
+   *
+   * @param limit of bytes beyond which we will shrink buffers when checked.
+   */
+  void setIdleReadBufferLimit(size_t limit) {
+    idleReadBufferLimit_ = limit;
+  }
+
+  /**
+   * [NOTE: This is for backwards compatibility, use setIdleReadBufferLimit().]
+   * Set the maximum size read buffer allocated to idle TConnection objects.
+   * If a TConnection object is found (either on connection close or between
+   * calls when resizeBufferEveryN_ is set) with more than this much memory
+   * allocated to its read buffer, we free it and allow it to be reinitialized
+   * on the next received frame.
+   *
+   * @param limit of bytes beyond which we will shrink buffers when checked.
+   */
+  void setIdleBufferMemLimit(size_t limit) {
+    idleReadBufferLimit_ = limit;
+  }
+
+
+
+  /**
+   * Get the maximum size of write buffer allocated to idle TConnection objects.
+   *
+   * @return # bytes beyond which we will reallocate buffers when checked.
+   */
+  size_t getIdleWriteBufferLimit() const {
+    return idleWriteBufferLimit_;
+  }
+
+  /**
+   * Set the maximum size write buffer allocated to idle TConnection objects.
+   * If a TConnection object is found (either on connection close or between
+   * calls when resizeBufferEveryN_ is set) with more than this much memory
+   * allocated to its write buffer, we destroy and construct that buffer with
+   * writeBufferDefaultSize_ bytes.
+   *
+   * @param limit of bytes beyond which we will shrink buffers when idle.
+   */
+  void setIdleWriteBufferLimit(size_t limit) {
+    idleWriteBufferLimit_ = limit;
+  }
+
+  /**
+   * Get # of calls made between buffer size checks.  0 means disabled.
+   *
+   * @return # of calls between buffer size checks.
+   */
+  int32_t getResizeBufferEveryN() const {
+    return resizeBufferEveryN_;
+  }
+
+  /**
+   * Check buffer sizes every "count" calls.  This allows buffer limits
+   * to be enforced for persistant connections with a controllable degree
+   * of overhead. 0 disables checks except at connection close.
+   *
+   * @param count the number of calls between checks, or 0 to disable
+   */
+  void setResizeBufferEveryN(int32_t count) {
+    resizeBufferEveryN_ = count;
+  }
+
+  /**
+   * Main workhorse function, starts up the server listening on a port and
+   * loops over the libevent handler.
+   */
+  void serve();
+
+  /**
+   * Causes the server to terminate gracefully (can be called from any thread).
+   */
+  void stop();
+
+  /// Creates a socket to listen on and binds it to the local port.
+  void createAndListenOnSocket();
+
+  /**
+   * Takes a socket created by createAndListenOnSocket() and sets various
+   * options on it to prepare for use in the server.
+   *
+   * @param fd descriptor of socket to be initialized/
+   */
+  void listenSocket(THRIFT_SOCKET fd);
+
+  /**
+   * Register the optional user-provided event-base (for single-thread servers)
+   *
+   * This method should be used when the server is running in a single-thread
+   * mode, and the event base is provided by the user (i.e., the caller).
+   *
+   * @param user_event_base the user-provided event-base. The user is
+   * responsible for freeing the event base memory.
+   */
+  void registerEvents(event_base* user_event_base);
+
+  /**
+   * Returns the optional user-provided event-base (for single-thread servers).
+   */
+  event_base* getUserEventBase() const { return userEventBase_; }
+
+ private:
+  /**
+   * Callback function that the threadmanager calls when a task reaches
+   * its expiration time.  It is needed to clean up the expired connection.
+   *
+   * @param task the runnable associated with the expired task.
+   */
+  void expireClose(boost::shared_ptr<Runnable> task);
+
+  /**
+   * Return an initialized connection object.  Creates or recovers from
+   * pool a TConnection and initializes it with the provided socket FD
+   * and flags.
+   *
+   * @param socket FD of socket associated with this connection.
+   * @param addr the sockaddr of the client
+   * @param addrLen the length of addr
+   * @return pointer to initialized TConnection object.
+   */
+  TConnection* createConnection(THRIFT_SOCKET socket, const sockaddr* addr,
+                                            socklen_t addrLen);
+
+  /**
+   * Returns a connection to pool or deletion.  If the connection pool
+   * (a stack) isn't full, place the connection object on it, otherwise
+   * just delete it.
+   *
+   * @param connection the TConection being returned.
+   */
+  void returnConnection(TConnection* connection);
+};
+
+class TNonblockingIOThread : public Runnable {
+ public:
+  // Creates an IO thread and sets up the event base.  The listenSocket should
+  // be a valid FD on which listen() has already been called.  If the
+  // listenSocket is < 0, accepting will not be done.
+  TNonblockingIOThread(TNonblockingServer* server,
+                       int number,
+                       THRIFT_SOCKET listenSocket,
+                       bool useHighPriority);
+
+  ~TNonblockingIOThread();
+
+  // Returns the event-base for this thread.
+  event_base* getEventBase() const { return eventBase_; }
+
+  // Returns the server for this thread.
+  TNonblockingServer* getServer() const { return server_; }
+
+  // Returns the number of this IO thread.
+  int getThreadNumber() const { return number_; }
+
+  // Returns the thread id associated with this object.  This should
+  // only be called after the thread has been started.
+  Thread::id_t getThreadId() const { return threadId_; }
+
+  // Returns the send-fd for task complete notifications.
+  evutil_socket_t getNotificationSendFD() const { return notificationPipeFDs_[1]; }
+
+  // Returns the read-fd for task complete notifications.
+  evutil_socket_t getNotificationRecvFD() const { return notificationPipeFDs_[0]; }
+
+  // Returns the actual thread object associated with this IO thread.
+  boost::shared_ptr<Thread> getThread() const { return thread_; }
+
+  // Sets the actual thread object associated with this IO thread.
+  void setThread(const boost::shared_ptr<Thread>& t) { thread_ = t; }
+
+  // Used by TConnection objects to indicate processing has finished.
+  bool notify(TNonblockingServer::TConnection* conn);
+
+  // Enters the event loop and does not return until a call to stop().
+  virtual void run();
+
+  // Exits the event loop as soon as possible.
+  void stop();
+
+  // Ensures that the event-loop thread is fully finished and shut down.
+  void join();
+
+  /// Registers the events for the notification & listen sockets
+  void registerEvents();
+
+ private:
+  /**
+   * C-callable event handler for signaling task completion.  Provides a
+   * callback that libevent can understand that will read a connection
+   * object's address from a pipe and call connection->transition() for
+   * that object.
+   *
+   * @param fd the descriptor the event occurred on.
+   */
+  static void notifyHandler(evutil_socket_t fd, short which, void* v);
+
+  /**
+   * C-callable event handler for listener events.  Provides a callback
+   * that libevent can understand which invokes server->handleEvent().
+   *
+   * @param fd the descriptor the event occured on.
+   * @param which the flags associated with the event.
+   * @param v void* callback arg where we placed TNonblockingServer's "this".
+   */
+  static void listenHandler(evutil_socket_t fd, short which, void* v) {
+    ((TNonblockingServer*)v)->handleEvent(fd, which);
+  }
+
+  /// Exits the loop ASAP in case of shutdown or error.
+  void breakLoop(bool error);
+
+  /// Create the pipe used to notify I/O process of task completion.
+  void createNotificationPipe();
+
+  /// Unregisters our events for notification and listen sockets.
+  void cleanupEvents();
+
+  /// Sets (or clears) high priority scheduling status for the current thread.
+  void setCurrentThreadHighPriority(bool value);
+
+ private:
+  /// associated server
+  TNonblockingServer* server_;
+
+  /// thread number (for debugging).
+  const int number_;
+
+  /// The actual physical thread id.
+  Thread::id_t threadId_;
+
+  /// If listenSocket_ >= 0, adds an event on the event_base to accept conns
+  THRIFT_SOCKET listenSocket_;
+
+  /// Sets a high scheduling priority when running
+  bool useHighPriority_;
+
+  /// pointer to eventbase to be used for looping
+  event_base* eventBase_;
+
+  /// Set to true if this class is responsible for freeing the event base
+  /// memory.
+  bool ownEventBase_;
+
+  /// Used with eventBase_ for connection events (only in listener thread)
+  struct event serverEvent_;
+
+  /// Used with eventBase_ for task completion notification
+  struct event notificationEvent_;
+
+ /// File descriptors for pipe used for task completion notification.
+  evutil_socket_t notificationPipeFDs_[2];
+
+  /// Actual IO Thread
+  boost::shared_ptr<Thread> thread_;
+};
+
+}}} // apache::thrift::server
+
+#endif // #ifndef _THRIFT_SERVER_TNONBLOCKINGSERVER_H_

http://git-wip-us.apache.org/repos/asf/airavata/blob/f891b7dc/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/thrift/server/TServer.cpp
----------------------------------------------------------------------
diff --git a/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/thrift/server/TServer.cpp b/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/thrift/server/TServer.cpp
new file mode 100755
index 0000000..f4ce744
--- /dev/null
+++ b/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/thrift/server/TServer.cpp
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <thrift/thrift-config.h>
+
+#ifdef HAVE_SYS_TIME_H
+#include <sys/time.h>
+#endif
+#ifdef HAVE_SYS_RESOURCE_H
+#include <sys/resource.h>
+#endif
+
+#ifdef HAVE_UNISTD_H
+#include <unistd.h>
+#endif
+
+namespace apache { namespace thrift { namespace server {
+
+int increase_max_fds(int max_fds=(1<<24))  {
+  struct rlimit fdmaxrl;
+
+  for(fdmaxrl.rlim_cur = max_fds, fdmaxrl.rlim_max = max_fds;
+      max_fds && (setrlimit(RLIMIT_NOFILE, &fdmaxrl) < 0);
+      fdmaxrl.rlim_cur = max_fds, fdmaxrl.rlim_max = max_fds) {
+    max_fds /= 2;
+  }
+
+  return static_cast<int>(fdmaxrl.rlim_cur);
+}
+
+}}} // apache::thrift::server


Mime
View raw message