httpd-cvs mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j..@apache.org
Subject svn commit: r1591620 [10/14] - in /httpd/mod_spdy/branches/httpd-2.2.x: ./ base/ base/metrics/ build/ install/ install/common/ install/debian/ install/rpm/ mod_spdy/ mod_spdy/apache/ mod_spdy/apache/filters/ mod_spdy/apache/testing/ mod_spdy/common/ mo...
Date Thu, 01 May 2014 11:39:32 GMT
Added: httpd/mod_spdy/branches/httpd-2.2.x/mod_spdy/common/testing/spdy_frame_matchers.cc
URL: http://svn.apache.org/viewvc/httpd/mod_spdy/branches/httpd-2.2.x/mod_spdy/common/testing/spdy_frame_matchers.cc?rev=1591620&view=auto
==============================================================================
--- httpd/mod_spdy/branches/httpd-2.2.x/mod_spdy/common/testing/spdy_frame_matchers.cc (added)
+++ httpd/mod_spdy/branches/httpd-2.2.x/mod_spdy/common/testing/spdy_frame_matchers.cc Thu May  1 11:39:27 2014
@@ -0,0 +1,271 @@
+// Copyright 2012 Google Inc. All Rights Reserved.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//      http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#include "mod_spdy/common/testing/spdy_frame_matchers.h"
+
+#include <iostream>
+#include <string>
+
+#include "base/basictypes.h"
+#include "base/strings/stringprintf.h"
+#include "mod_spdy/common/protocol_util.h"
+#include "net/spdy/spdy_framer.h"
+#include "net/spdy/spdy_protocol.h"
+#include "testing/gmock/include/gmock/gmock.h"
+
+namespace {
+
+void AppendHeadersString(const net::SpdyNameValueBlock& headers,
+                         std::string* out) {
+  out->append("{ ");
+  bool comma = false;
+  for (net::SpdyNameValueBlock::const_iterator iter = headers.begin();
+       iter != headers.end(); ++iter) {
+    if (comma) {
+      out->append(", ");
+    }
+    base::StringAppendF(out, "'%s': '%s'", iter->first.c_str(),
+                        iter->second.c_str());
+    comma = true;
+  }
+  out->append(" }");
+}
+
+class FrameToStringVisitor : public net::SpdyFrameVisitor {
+ public:
+  explicit FrameToStringVisitor(std::string* out)
+      : out_(out) {
+    CHECK(out_);
+  }
+  virtual ~FrameToStringVisitor() {}
+
+  virtual void VisitSynStream(const net::SpdySynStreamIR& syn_stream) {
+    // TODO(mdsteele): include other fields
+    base::StringAppendF(
+        out_, "SYN_STREAM(%u p%u%s%s)",
+        static_cast<unsigned>(syn_stream.stream_id()),
+        static_cast<unsigned>(syn_stream.priority()),
+        (syn_stream.fin() ? " fin" : ""),
+        (syn_stream.unidirectional() ? " unidirectional" : ""));
+    AppendHeadersString(syn_stream.name_value_block(), out_);
+  }
+  virtual void VisitSynReply(const net::SpdySynReplyIR& syn_reply) {
+    base::StringAppendF(
+        out_, "SYN_REPLY(%u%s)",
+        static_cast<unsigned>(syn_reply.stream_id()),
+        (syn_reply.fin() ? " fin" : ""));
+    AppendHeadersString(syn_reply.name_value_block(), out_);
+  }
+  virtual void VisitRstStream(const net::SpdyRstStreamIR& rst_stream) {
+    base::StringAppendF(
+        out_, "RST_STREAM(%u %s)",
+        static_cast<unsigned>(rst_stream.stream_id()),
+        mod_spdy::RstStreamStatusCodeToString(rst_stream.status()));
+  }
+  virtual void VisitSettings(const net::SpdySettingsIR& settings) {
+    base::StringAppendF(
+        out_, "SETTINGS(%s",
+        (settings.clear_settings() ? "clear " : ""));
+    bool comma = false;
+    for (net::SpdySettingsIR::ValueMap::const_iterator iter =
+             settings.values().begin(), end = settings.values().end();
+         iter != end; ++iter) {
+      if (comma) {
+        out_->append(", ");
+      }
+      base::StringAppendF(
+          out_, "%s%s%s: %d",
+          (iter->second.persist_value ? "persist " : ""),
+          (iter->second.persisted ? "persisted " : ""),
+          mod_spdy::SettingsIdToString(iter->first),
+          static_cast<int>(iter->second.value));
+    }
+    out_->append(")");
+  }
+  virtual void VisitPing(const net::SpdyPingIR& ping) {
+    base::StringAppendF(
+        out_, "PING(%u)", static_cast<unsigned>(ping.id()));
+  }
+  virtual void VisitGoAway(const net::SpdyGoAwayIR& goaway) {
+    base::StringAppendF(
+        out_, "GOAWAY(%u %s)",
+        static_cast<unsigned>(goaway.last_good_stream_id()),
+        mod_spdy::GoAwayStatusCodeToString(goaway.status()));
+  }
+  virtual void VisitHeaders(const net::SpdyHeadersIR& headers) {
+    base::StringAppendF(
+        out_, "HEADERS(%u%s)", static_cast<unsigned>(headers.stream_id()),
+        (headers.fin() ? " fin" : ""));
+    AppendHeadersString(headers.name_value_block(), out_);
+  }
+  virtual void VisitWindowUpdate(const net::SpdyWindowUpdateIR& window) {
+    base::StringAppendF(
+        out_, "WINDOW_UPDATE(%u %+d)",
+        static_cast<unsigned>(window.stream_id()),
+        static_cast<int>(window.delta()));
+  }
+  virtual void VisitCredential(const net::SpdyCredentialIR& credential) {
+    // TODO(mdsteele): include other fields
+    base::StringAppendF(
+        out_, "CREDENTIAL(%d)", static_cast<int>(credential.slot()));
+  }
+  virtual void VisitBlocked(const net::SpdyBlockedIR& blocked) {
+    base::StringAppendF(
+        out_, "BLOCKED(%u)", static_cast<unsigned>(blocked.stream_id()));
+  }
+  virtual void VisitPushPromise(const net::SpdyPushPromiseIR& push_promise) {
+    base::StringAppendF(
+        out_, "PUSH_PROMISE(%u, %u)",
+        static_cast<unsigned>(push_promise.stream_id()),
+        static_cast<unsigned>(push_promise.promised_stream_id()));
+  }
+  virtual void VisitData(const net::SpdyDataIR& data) {
+    base::StringAppendF(
+        out_, "DATA(%u%s \"", static_cast<unsigned>(data.stream_id()),
+        (data.fin() ? " fin" : ""));
+    out_->append(data.data().data(), data.data().size());
+    out_->append("\")");
+  }
+
+ private:
+  std::string* out_;
+
+  DISALLOW_COPY_AND_ASSIGN(FrameToStringVisitor);
+};
+
+void AppendSpdyFrameToString(const net::SpdyFrameIR& frame, std::string* out) {
+  FrameToStringVisitor visitor(out);
+  frame.Visit(&visitor);
+}
+
+class IsEquivalentFrameMatcher :
+      public ::testing::MatcherInterface<const net::SpdyFrameIR&> {
+ public:
+  explicit IsEquivalentFrameMatcher(const net::SpdyFrameIR& frame);
+  virtual ~IsEquivalentFrameMatcher();
+  virtual bool MatchAndExplain(const net::SpdyFrameIR& frame,
+                               ::testing::MatchResultListener* listener) const;
+  virtual void DescribeTo(std::ostream* out) const;
+  virtual void DescribeNegationTo(std::ostream* out) const;
+
+ private:
+  std::string expected_;
+
+  DISALLOW_COPY_AND_ASSIGN(IsEquivalentFrameMatcher);
+};
+
+IsEquivalentFrameMatcher::IsEquivalentFrameMatcher(
+    const net::SpdyFrameIR& frame) {
+  AppendSpdyFrameToString(frame, &expected_);
+}
+
+IsEquivalentFrameMatcher::~IsEquivalentFrameMatcher() {}
+
+bool IsEquivalentFrameMatcher::MatchAndExplain(
+    const net::SpdyFrameIR& frame,
+    ::testing::MatchResultListener* listener) const {
+  std::string actual;
+  AppendSpdyFrameToString(frame, &actual);
+  if (actual != expected_) {
+    *listener << "is a " << actual << " frame";
+    return false;
+  }
+  return true;
+}
+
+void IsEquivalentFrameMatcher::DescribeTo(std::ostream* out) const {
+  *out << "is a " << expected_ << " frame";
+}
+
+void IsEquivalentFrameMatcher::DescribeNegationTo(std::ostream* out) const {
+  *out << "isn't a " << expected_ << " frame";
+}
+
+}  // namespace
+
+namespace mod_spdy {
+
+namespace testing {
+
+::testing::Matcher<const net::SpdyFrameIR&> IsSynStream(
+     net::SpdyStreamId stream_id, net::SpdyStreamId assoc_stream_id,
+     net::SpdyPriority priority, bool fin, bool unidirectional,
+     const net::SpdyNameValueBlock& headers) {
+  net::SpdySynStreamIR frame(stream_id);
+  frame.set_associated_to_stream_id(assoc_stream_id);
+  frame.set_priority(priority);
+  frame.set_fin(fin);
+  frame.set_unidirectional(unidirectional);
+  frame.GetMutableNameValueBlock()->insert(headers.begin(), headers.end());
+  return ::testing::MakeMatcher(new IsEquivalentFrameMatcher(frame));
+}
+
+::testing::Matcher<const net::SpdyFrameIR&> IsSynReply(
+     net::SpdyStreamId stream_id, bool fin,
+     const net::SpdyNameValueBlock& headers) {
+  net::SpdySynReplyIR frame(stream_id);
+  frame.set_fin(fin);
+  frame.GetMutableNameValueBlock()->insert(headers.begin(), headers.end());
+  return ::testing::MakeMatcher(new IsEquivalentFrameMatcher(frame));
+}
+
+::testing::Matcher<const net::SpdyFrameIR&> IsRstStream(
+     net::SpdyStreamId stream_id, net::SpdyRstStreamStatus status) {
+  net::SpdyRstStreamIR frame(stream_id, status);
+  return ::testing::MakeMatcher(new IsEquivalentFrameMatcher(frame));
+}
+
+::testing::Matcher<const net::SpdyFrameIR&> IsSettings(
+     net::SpdySettingsIds id, int32 value) {
+  net::SpdySettingsIR frame;
+  frame.AddSetting(id, false, false, value);
+  return ::testing::MakeMatcher(new IsEquivalentFrameMatcher(frame));
+}
+
+::testing::Matcher<const net::SpdyFrameIR&> IsPing(net::SpdyPingId ping_id) {
+  net::SpdyPingIR frame(ping_id);
+  return ::testing::MakeMatcher(new IsEquivalentFrameMatcher(frame));
+}
+
+::testing::Matcher<const net::SpdyFrameIR&> IsGoAway(
+     net::SpdyStreamId last_good_stream_id, net::SpdyGoAwayStatus status) {
+  net::SpdyGoAwayIR frame(last_good_stream_id, status);
+  return ::testing::MakeMatcher(new IsEquivalentFrameMatcher(frame));
+}
+
+::testing::Matcher<const net::SpdyFrameIR&> IsHeaders(
+     net::SpdyStreamId stream_id, bool fin,
+     const net::SpdyNameValueBlock& headers) {
+  net::SpdyHeadersIR frame(stream_id);
+  frame.set_fin(fin);
+  frame.GetMutableNameValueBlock()->insert(headers.begin(), headers.end());
+  return ::testing::MakeMatcher(new IsEquivalentFrameMatcher(frame));
+}
+
+::testing::Matcher<const net::SpdyFrameIR&> IsWindowUpdate(
+     net::SpdyStreamId stream_id, uint32 delta) {
+  net::SpdyWindowUpdateIR frame(stream_id, delta);
+  return ::testing::MakeMatcher(new IsEquivalentFrameMatcher(frame));
+}
+
+::testing::Matcher<const net::SpdyFrameIR&> IsDataFrame(
+    net::SpdyStreamId stream_id, bool fin, base::StringPiece payload) {
+  net::SpdyDataIR frame(stream_id, payload);
+  frame.set_fin(fin);
+  return ::testing::MakeMatcher(new IsEquivalentFrameMatcher(frame));
+}
+
+}  // namespace testing
+
+}  // namespace mod_spdy

Added: httpd/mod_spdy/branches/httpd-2.2.x/mod_spdy/common/testing/spdy_frame_matchers.h
URL: http://svn.apache.org/viewvc/httpd/mod_spdy/branches/httpd-2.2.x/mod_spdy/common/testing/spdy_frame_matchers.h?rev=1591620&view=auto
==============================================================================
--- httpd/mod_spdy/branches/httpd-2.2.x/mod_spdy/common/testing/spdy_frame_matchers.h (added)
+++ httpd/mod_spdy/branches/httpd-2.2.x/mod_spdy/common/testing/spdy_frame_matchers.h Thu May  1 11:39:27 2014
@@ -0,0 +1,79 @@
+// Copyright 2012 Google Inc. All Rights Reserved.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//      http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#ifndef MOD_SPDY_TESTING_SPDY_FRAME_MATCHERS_H_
+#define MOD_SPDY_TESTING_SPDY_FRAME_MATCHERS_H_
+
+#include "base/basictypes.h"
+#include "base/strings/string_piece.h"
+#include "net/spdy/spdy_protocol.h"
+#include "testing/gmock/include/gmock/gmock.h"
+
+namespace mod_spdy {
+
+namespace testing {
+
+// Make a matcher that requires the argument to be a SYN_STREAM frame with the
+// given stream ID, associated stream ID, priority, flag_fin,
+// flag_unidirectional, and headers.
+::testing::Matcher<const net::SpdyFrameIR&> IsSynStream(
+     net::SpdyStreamId stream_id, net::SpdyStreamId assoc_stream_id,
+     net::SpdyPriority priority, bool fin, bool unidirectional,
+     const net::SpdyNameValueBlock& headers);
+
+// Make a matcher that requires the argument to be a SYN_REPLY frame with the
+// given stream ID, flag_fin, and headers.
+::testing::Matcher<const net::SpdyFrameIR&> IsSynReply(
+     net::SpdyStreamId stream_id, bool fin,
+     const net::SpdyNameValueBlock& headers);
+
+// Make a matcher that requires the argument to be a RST_STREAM frame with the
+// given stream ID and status code.
+::testing::Matcher<const net::SpdyFrameIR&> IsRstStream(
+     net::SpdyStreamId stream_id, net::SpdyRstStreamStatus status);
+
+// Make a matcher that requires the argument to be a SETTINGS frame with the
+// given setting.
+::testing::Matcher<const net::SpdyFrameIR&> IsSettings(
+     net::SpdySettingsIds id, int32 value);
+
+// Make a matcher that requires the argument to be a PING frame with the
+// given ID.
+::testing::Matcher<const net::SpdyFrameIR&> IsPing(net::SpdyPingId ping_id);
+
+// Make a matcher that requires the argument to be a GOAWAY frame with the
+// given last-good-stream-ID and status code.
+::testing::Matcher<const net::SpdyFrameIR&> IsGoAway(
+     net::SpdyStreamId last_good_stream_id, net::SpdyGoAwayStatus status);
+
+// Make a matcher that requires the argument to be a HEADERS frame with the
+// given stream ID, flag_fin, and headers.
+::testing::Matcher<const net::SpdyFrameIR&> IsHeaders(
+     net::SpdyStreamId stream_id, bool fin,
+     const net::SpdyNameValueBlock& headers);
+
+// Make a matcher that requires the argument to be a WINDOW_UPDATE frame with
+// the given window-size-delta.
+::testing::Matcher<const net::SpdyFrameIR&> IsWindowUpdate(
+     net::SpdyStreamId stream_id, uint32 delta);
+
+// Make a matcher that requires the argument to be a DATA frame.
+::testing::Matcher<const net::SpdyFrameIR&> IsDataFrame(
+     net::SpdyStreamId stream_id, bool fin, base::StringPiece payload);
+
+}  // namespace testing
+
+}  // namespace mod_spdy
+
+#endif  // MOD_SPDY_TESTING_SPDY_FRAME_MATCHERS_H_

Propchange: httpd/mod_spdy/branches/httpd-2.2.x/mod_spdy/common/testing/spdy_frame_matchers.h
------------------------------------------------------------------------------
    svn:eol-style = native

Added: httpd/mod_spdy/branches/httpd-2.2.x/mod_spdy/common/thread_pool.cc
URL: http://svn.apache.org/viewvc/httpd/mod_spdy/branches/httpd-2.2.x/mod_spdy/common/thread_pool.cc?rev=1591620&view=auto
==============================================================================
--- httpd/mod_spdy/branches/httpd-2.2.x/mod_spdy/common/thread_pool.cc (added)
+++ httpd/mod_spdy/branches/httpd-2.2.x/mod_spdy/common/thread_pool.cc Thu May  1 11:39:27 2014
@@ -0,0 +1,490 @@
+// Copyright 2012 Google Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//      http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#include "mod_spdy/common/thread_pool.h"
+
+#include <map>
+#include <set>
+#include <vector>
+
+#include "base/basictypes.h"
+#include "base/memory/scoped_ptr.h"
+#include "base/synchronization/condition_variable.h"
+#include "base/synchronization/lock.h"
+#include "base/threading/platform_thread.h"
+#include "base/time/time.h"
+#include "mod_spdy/common/executor.h"
+#include "net/instaweb/util/public/function.h"
+#include "net/spdy/spdy_protocol.h"
+
+namespace {
+
+// Shut down a worker thread after it has been idle for this many seconds:
+const int64 kDefaultMaxWorkerIdleSeconds = 60;
+
+}  // namespace
+
+namespace mod_spdy {
+
+// An executor that uses the ThreadPool to execute tasks.  Returned by
+// ThreadPool::NewExecutor.
+class ThreadPool::ThreadPoolExecutor : public Executor {
+ public:
+  explicit ThreadPoolExecutor(ThreadPool* master)
+      : master_(master),
+        stopping_condvar_(&master_->lock_),
+        stopped_(false) {}
+  virtual ~ThreadPoolExecutor() { Stop(); }
+
+  // Executor methods:
+  virtual void AddTask(net_instaweb::Function* task,
+                       net::SpdyPriority priority);
+  virtual void Stop();
+
+ private:
+  friend class ThreadPool;
+  ThreadPool* const master_;
+  base::ConditionVariable stopping_condvar_;
+  bool stopped_;  // protected by master_->lock_
+
+  DISALLOW_COPY_AND_ASSIGN(ThreadPoolExecutor);
+};
+
+// Add a task to the executor; if the executor has already been stopped, just
+// cancel the task immediately.
+void ThreadPool::ThreadPoolExecutor::AddTask(net_instaweb::Function* task,
+                                             net::SpdyPriority priority) {
+  {
+    base::AutoLock autolock(master_->lock_);
+
+    // Clean up any zombie WorkerThreads in the ThreadPool that are waiting for
+    // reaping.  If the OS process we're in accumulates too many unjoined
+    // zombie threads over time, the OS might not be able to spawn a new thread
+    // below.  So right now is a good time to clean them up.
+    if (!master_->zombies_.empty()) {
+      std::set<WorkerThread*> zombies;
+      zombies.swap(master_->zombies_);
+      // Joining these threads should be basically instant, since they've
+      // already terminated.  But to be safe, let's unlock while we join them.
+      base::AutoUnlock autounlock(master_->lock_);
+      ThreadPool::JoinThreads(zombies);
+    }
+
+    // The thread pool shouldn't be shutting down until all executors are
+    // destroyed.  Since this executor clearly still exists, the thread pool
+    // must still be open.
+    DCHECK(!master_->shutting_down_);
+
+    // If the executor hasn't been stopped, add the task to the queue and
+    // notify a worker that there's a new task ready to be taken.
+    if (!stopped_) {
+      master_->task_queue_.insert(std::make_pair(priority, Task(task, this)));
+      master_->worker_condvar_.Signal();
+      master_->StartNewWorkerIfNeeded();
+      return;
+    }
+  }
+
+  // If this executor has already been stopped, just cancel the task (after
+  // releasing the lock).
+  task->CallCancel();
+}
+
+// Stop the executor.  Cancel all pending tasks in the thread pool owned by
+// this executor, and then block until all active tasks owned by this executor
+// complete.  Stopping the executor more than once has no effect.
+void ThreadPool::ThreadPoolExecutor::Stop() {
+  std::vector<net_instaweb::Function*> functions_to_cancel;
+  {
+    base::AutoLock autolock(master_->lock_);
+    if (stopped_) {
+      return;
+    }
+    stopped_ = true;
+
+    // Remove all tasks owned by this executor from the queue, and collect up
+    // the function objects to be cancelled.
+    TaskQueue::iterator next_iter = master_->task_queue_.begin();
+    while (next_iter != master_->task_queue_.end()) {
+      TaskQueue::iterator iter = next_iter;
+      const Task& task = iter->second;
+      ++next_iter;  // Increment next_iter _before_ we might erase iter.
+      if (task.owner == this) {
+        functions_to_cancel.push_back(task.function);
+        master_->task_queue_.erase(iter);
+      }
+    }
+  }
+
+  // Unlock while we cancel the functions, so we're not hogging the lock for
+  // too long, and to avoid potential deadlock if the cancel method tries to do
+  // anything with the thread pool.
+  for (std::vector<net_instaweb::Function*>::const_iterator iter =
+           functions_to_cancel.begin();
+       iter != functions_to_cancel.end(); ++iter) {
+    (*iter)->CallCancel();
+  }
+  // CallCancel deletes the Function objects, invalidating the pointers in this
+  // list, so let's go ahead and clear it (which also saves a little memory
+  // while we're blocked below).
+  functions_to_cancel.clear();
+
+  // Block until all our active tasks are completed.
+  {
+    base::AutoLock autolock(master_->lock_);
+    while (master_->active_task_counts_.count(this) > 0) {
+      stopping_condvar_.Wait();
+    }
+  }
+}
+
+// A WorkerThread object wraps a platform-specific thread handle, and provides
+// the method run by that thread (ThreadMain).
+class ThreadPool::WorkerThread : public base::PlatformThread::Delegate {
+ public:
+  explicit WorkerThread(ThreadPool* master);
+  virtual ~WorkerThread();
+
+  // Start the thread running.  Return false on failure.  If this succeeds,
+  // then you must call Join() before deleting this object.
+  bool Start();
+
+  // Block until the thread completes.  You must set master_->shutting_down_ to
+  // true before calling this method, or the thread will never terminate.
+  // You shouldn't be holding master_->lock_ when calling this.
+  void Join();
+
+  // base::PlatformThread::Delegate method:
+  virtual void ThreadMain();
+
+ private:
+  enum ThreadState { NOT_STARTED, STARTED, JOINED };
+
+  ThreadPool* const master_;
+  // If two master threads are sharing the same ThreadPool, then Start() and
+  // Join() might get called by different threads.  So to be safe we use a lock
+  // to protect the two below fields.
+  base::Lock thread_lock_;
+  ThreadState state_;
+  base::PlatformThreadHandle thread_id_;
+
+  DISALLOW_COPY_AND_ASSIGN(WorkerThread);
+};
+
+ThreadPool::WorkerThread::WorkerThread(ThreadPool* master)
+    : master_(master), state_(NOT_STARTED), thread_id_() {}
+
+ThreadPool::WorkerThread::~WorkerThread() {
+  base::AutoLock autolock(thread_lock_);
+  // If we started the thread, we _must_ join it before deleting this object,
+  // or else the thread won't get cleaned up by the OS.
+  DCHECK(state_ == NOT_STARTED || state_ == JOINED);
+}
+
+bool ThreadPool::WorkerThread::Start() {
+  base::AutoLock autolock(thread_lock_);
+  DCHECK_EQ(NOT_STARTED, state_);
+  if (base::PlatformThread::Create(0, this, &thread_id_)) {
+    state_ = STARTED;
+    return true;
+  }
+  return false;
+}
+
+void ThreadPool::WorkerThread::Join() {
+  base::AutoLock autolock(thread_lock_);
+  DCHECK_EQ(STARTED, state_);
+  base::PlatformThread::Join(thread_id_);
+  state_ = JOINED;
+}
+
+// This is the code executed by the thread; when this method returns, the
+// thread will terminate.
+void ThreadPool::WorkerThread::ThreadMain() {
+  // We start by grabbing the master lock, but we release it below whenever we
+  // are 1) waiting for a new task or 2) executing a task.  So in fact most of
+  // the time we are not holding the lock.
+  base::AutoLock autolock(master_->lock_);
+  while (true) {
+    // Wait until there's a task available (or we're shutting down), but don't
+    // stay idle for more than kMaxWorkerIdleSeconds seconds.
+    base::TimeDelta time_remaining = master_->max_thread_idle_time_;
+    while (!master_->shutting_down_ && master_->task_queue_.empty() &&
+           time_remaining.InSecondsF() > 0.0) {
+      // Note that TimedWait can wake up spuriously before the time runs out,
+      // so we need to measure how long we actually waited for.
+      const base::Time start = base::Time::Now();
+      master_->worker_condvar_.TimedWait(time_remaining);
+      const base::Time end = base::Time::Now();
+      // Note that the system clock can go backwards if it is reset, so make
+      // sure we never _increase_ time_remaining.
+      if (end > start) {
+        time_remaining -= end - start;
+      }
+    }
+
+    // If the thread pool is shutting down, terminate this thread; the master
+    // is about to join/delete us (in its destructor).
+    if (master_->shutting_down_) {
+      return;
+    }
+
+    // If we ran out of time without getting a task, maybe this thread should
+    // shut itself down.
+    if (master_->task_queue_.empty()) {
+      DCHECK_LE(time_remaining.InSecondsF(), 0.0);
+      // Ask the master if we should stop.  If this returns true, this worker
+      // has been zombified, so we're free to terminate the thread.
+      if (master_->TryZombifyIdleThread(this)) {
+        return;  // Yes, we should stop; terminate the thread.
+      } else {
+        continue;  // No, we shouldn't stop; jump to the top of the while loop.
+      }
+    }
+
+    // Otherwise, there must be at least one task available now.  Grab one from
+    // the master, who will then treat us as busy until we complete it.
+    const Task task = master_->GetNextTask();
+    // Release the lock while we execute the task.  Note that we use AutoUnlock
+    // here rather than one AutoLock for the above code and another for the
+    // below code, so that we don't have to release and reacquire the lock at
+    // the edge of the while-loop.
+    {
+      base::AutoUnlock autounlock(master_->lock_);
+      task.function->CallRun();
+    }
+    // Inform the master we have completed the task and are no longer busy.
+    master_->OnTaskComplete(task);
+  }
+}
+
+ThreadPool::ThreadPool(int min_threads, int max_threads)
+    : min_threads_(min_threads),
+      max_threads_(max_threads),
+      max_thread_idle_time_(
+          base::TimeDelta::FromSeconds(kDefaultMaxWorkerIdleSeconds)),
+      worker_condvar_(&lock_),
+      num_busy_workers_(0),
+      shutting_down_(false) {
+  DCHECK_GE(max_thread_idle_time_.InSecondsF(), 0.0);
+  // Note that we check e.g. min_threads rather than min_threads_ (which is
+  // unsigned), in order to catch negative numbers.
+  DCHECK_GE(min_threads, 1);
+  DCHECK_GE(max_threads, 1);
+  DCHECK_LE(min_threads_, max_threads_);
+}
+
+ThreadPool::ThreadPool(int min_threads, int max_threads,
+                       base::TimeDelta max_thread_idle_time)
+    : min_threads_(min_threads),
+      max_threads_(max_threads),
+      max_thread_idle_time_(max_thread_idle_time),
+      worker_condvar_(&lock_),
+      num_busy_workers_(0),
+      shutting_down_(false) {
+  DCHECK_GE(max_thread_idle_time_.InSecondsF(), 0.0);
+  DCHECK_GE(min_threads, 1);
+  DCHECK_GE(max_threads, 1);
+  DCHECK_LE(min_threads_, max_threads_);
+}
+
+ThreadPool::~ThreadPool() {
+  base::AutoLock autolock(lock_);
+
+  // If we're doing things right, all the Executors should have been
+  // destroyed before the ThreadPool is destroyed, so there should be no
+  // pending or active tasks.
+  DCHECK(task_queue_.empty());
+  DCHECK(active_task_counts_.empty());
+
+  // Wake up all the worker threads and tell them to shut down.
+  shutting_down_ = true;
+  worker_condvar_.Broadcast();
+
+  // Clean up all our threads.
+  std::set<WorkerThread*> threads;
+  zombies_.swap(threads);
+  threads.insert(workers_.begin(), workers_.end());
+  workers_.clear();
+  {
+    base::AutoUnlock autounlock(lock_);
+    JoinThreads(threads);
+  }
+
+  // Because we had shutting_down_ set to true, nothing should have been added
+  // to our WorkerThread sets while we were unlocked.  So we should be all
+  // cleaned up now.
+  DCHECK(workers_.empty());
+  DCHECK(zombies_.empty());
+  DCHECK(task_queue_.empty());
+  DCHECK(active_task_counts_.empty());
+}
+
+bool ThreadPool::Start() {
+  base::AutoLock autolock(lock_);
+  DCHECK(task_queue_.empty());
+  DCHECK(workers_.empty());
+  // Start up min_threads_ workers; if any of the worker threads fail to start,
+  // then this method fails and the ThreadPool should be deleted.
+  for (unsigned int i = 0; i < min_threads_; ++i) {
+    scoped_ptr<WorkerThread> worker(new WorkerThread(this));
+    if (!worker->Start()) {
+      return false;
+    }
+    workers_.insert(worker.release());
+  }
+  DCHECK_EQ(min_threads_, workers_.size());
+  return true;
+}
+
+Executor* ThreadPool::NewExecutor() {
+  return new ThreadPoolExecutor(this);
+}
+
+int ThreadPool::GetNumWorkersForTest() {
+  base::AutoLock autolock(lock_);
+  return workers_.size();
+}
+
+int ThreadPool::GetNumIdleWorkersForTest() {
+  base::AutoLock autolock(lock_);
+  DCHECK_GE(num_busy_workers_, 0u);
+  DCHECK_LE(num_busy_workers_, workers_.size());
+  return workers_.size() - num_busy_workers_;
+}
+
+int ThreadPool::GetNumZombiesForTest() {
+  base::AutoLock autolock(lock_);
+  return zombies_.size();
+}
+
+// This method is called each time we add a new task to the thread pool.
+void ThreadPool::StartNewWorkerIfNeeded() {
+  lock_.AssertAcquired();
+  DCHECK_GE(num_busy_workers_, 0u);
+  DCHECK_LE(num_busy_workers_, workers_.size());
+  DCHECK_GE(workers_.size(), min_threads_);
+  DCHECK_LE(workers_.size(), max_threads_);
+
+  // We create a new worker to handle the task _unless_ either 1) we're already
+  // at the maximum number of threads, or 2) there are already enough idle
+  // workers sitting around to take on this task (and all other pending tasks
+  // that the idle workers haven't yet had a chance to pick up).
+  if (workers_.size() >= max_threads_ ||
+      task_queue_.size() <= workers_.size() - num_busy_workers_) {
+    return;
+  }
+
+  scoped_ptr<WorkerThread> worker(new WorkerThread(this));
+  if (worker->Start()) {
+    workers_.insert(worker.release());
+  } else {
+    LOG(ERROR) << "Failed to start new worker thread.";
+  }
+}
+
+// static
+void ThreadPool::JoinThreads(const std::set<WorkerThread*>& threads) {
+  for (std::set<WorkerThread*>::const_iterator iter = threads.begin();
+       iter != threads.end(); ++iter) {
+    WorkerThread* thread = *iter;
+    thread->Join();
+    delete thread;
+  }
+}
+
+// Call when the worker thread has been idle for a while.  Either return false
+// (worker should continue waiting for tasks), or zombify the worker and return
+// true (worker thread should immediately terminate).
+bool ThreadPool::TryZombifyIdleThread(WorkerThread* thread) {
+  lock_.AssertAcquired();
+
+  // Don't terminate the thread if the thread pool is already at the minimum
+  // number of threads.
+  DCHECK_GE(workers_.size(), min_threads_);
+  if (workers_.size() <= min_threads_) {
+    return false;
+  }
+
+  // Remove this thread from the worker set.
+  DCHECK_EQ(1u, workers_.count(thread));
+  workers_.erase(thread);
+
+  // When a (joinable) thread terminates, it must still be cleaned up, either
+  // by another thread joining it, or by detatching it.  However, the thread
+  // pool's not shutting down here, so the master thread doesn't know to join
+  // this thread that we're in now, and the Chromium thread abstraction we're
+  // using doesn't currently allow us to detach a thread.  So instead, we place
+  // this WorkerThread object into a "zombie" set, which the master thread can
+  // reap later on.  Threads that have terminated but that haven't been joined
+  // yet use up only a small amount of memory (I think), so it's okay if we
+  // don't reap it right away, as long as we don't try to spawn new threads
+  // while there's still lots of zombies.
+  DCHECK(!shutting_down_);
+  DCHECK_EQ(0u, zombies_.count(thread));
+  zombies_.insert(thread);
+  return true;
+}
+
+// Get and return the next task from the queue (which must be non-empty), and
+// update our various counters to indicate that the calling worker is busy
+// executing this task.
+ThreadPool::Task ThreadPool::GetNextTask() {
+  lock_.AssertAcquired();
+
+  // Pop the highest-priority task from the queue.  Note that smaller values
+  // correspond to higher priorities (SPDY draft 3 section 2.3.3), so
+  // task_queue_.begin() gets us the highest-priority pending task.
+  DCHECK(!task_queue_.empty());
+  TaskQueue::iterator task_iter = task_queue_.begin();
+  const Task task = task_iter->second;
+  task_queue_.erase(task_iter);
+
+  // Increment the count of active tasks for the executor that owns this
+  // task; we'll decrement it again when the task completes.
+  ++(active_task_counts_[task.owner]);
+
+  // The worker that takes this task will be busy until it completes it.
+  DCHECK_LT(num_busy_workers_, workers_.size());
+  ++num_busy_workers_;
+
+  return task;
+}
+
+// Call to indicate that the task has been completed; update our various
+// counters to indicate that the calling worker is no longer busy executing
+// this task.
+void ThreadPool::OnTaskComplete(Task task) {
+  lock_.AssertAcquired();
+
+  // The worker that just finished this task is no longer busy.
+  DCHECK_GE(num_busy_workers_, 1u);
+  --num_busy_workers_;
+
+  // We've completed the task and reaquired the lock, so decrement the count
+  // of active tasks for this owner.
+  OwnerMap::iterator count_iter = active_task_counts_.find(task.owner);
+  DCHECK(count_iter != active_task_counts_.end());
+  DCHECK(count_iter->second > 0);
+  --(count_iter->second);
+
+  // If this was the last active task for the owner, notify anyone who might be
+  // waiting for the owner to stop.
+  if (count_iter->second == 0) {
+    active_task_counts_.erase(count_iter);
+    task.owner->stopping_condvar_.Broadcast();
+  }
+}
+
+}  // namespace mod_spdy

Added: httpd/mod_spdy/branches/httpd-2.2.x/mod_spdy/common/thread_pool.h
URL: http://svn.apache.org/viewvc/httpd/mod_spdy/branches/httpd-2.2.x/mod_spdy/common/thread_pool.h?rev=1591620&view=auto
==============================================================================
--- httpd/mod_spdy/branches/httpd-2.2.x/mod_spdy/common/thread_pool.h (added)
+++ httpd/mod_spdy/branches/httpd-2.2.x/mod_spdy/common/thread_pool.h Thu May  1 11:39:27 2014
@@ -0,0 +1,145 @@
+// Copyright 2012 Google Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//      http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#ifndef MOD_SPDY_COMMON_THREAD_POOL_H_
+#define MOD_SPDY_COMMON_THREAD_POOL_H_
+
+#include <map>
+#include <set>
+
+#include "base/basictypes.h"
+#include "base/synchronization/condition_variable.h"
+#include "base/synchronization/lock.h"
+#include "base/time/time.h"
+#include "net/spdy/spdy_protocol.h"  // for net::SpdyPriority
+
+namespace net_instaweb { class Function; }
+
+namespace mod_spdy {
+
+class Executor;
+
+// A ThreadPool keeps a pool of threads waiting to perform tasks.  One can
+// create any number of Executor objects, using the NewExecutor method, which
+// will all share the threads for executing tasks.  If more tasks are queued
+// than there are threads in the pool, these executors will respect task
+// priorities when deciding which tasks to execute first.
+class ThreadPool {
+ public:
+  // Create a new thread pool that uses at least min_threads threads, and at
+  // most max_threads threads, at a time.  min_threads must be no greater than
+  // max_threads, and both must be positive.
+  ThreadPool(int min_threads, int max_threads);
+
+  // As above, but specify the amount of time after which to kill idle threads,
+  // rather than using the default value (this is primarily for testing).
+  // max_thread_idle_time must be non-negative.
+  ThreadPool(int min_threads, int max_threads,
+             base::TimeDelta max_thread_idle_time);
+
+  // The destructor will block until all threads in the pool have shut down.
+  // The ThreadPool must not be destroyed until all Executor objects returned
+  // from the NewExecutor method have first been deleted.
+  ~ThreadPool();
+
+  // Start up the thread pool.  Must be called exactly one before using the
+  // thread pool; returns true on success, or false on failure.  If startup
+  // fails, the ThreadPool must be immediately deleted.
+  bool Start();
+
+  // Return a new Executor object that uses this thread pool to perform tasks.
+  // The caller gains ownership of the returned Executor, and the ThreadPool
+  // must outlive the returned Executor.
+  Executor* NewExecutor();
+
+  // Return the current total number of worker threads.  This is provided for
+  // testing purposes only.
+  int GetNumWorkersForTest();
+  // Return the number of worker threads currently idle.  This is provided for
+  // testing purposes only.
+  int GetNumIdleWorkersForTest();
+  // Return the number of terminated (zombie) threads that have yet to be
+  // reaped.  This is provided for testing purposes only.
+  int GetNumZombiesForTest();
+
+ private:
+  class ThreadPoolExecutor;
+  class WorkerThread;
+
+  // A Task is a simple pair of the Function to run, and the executor to which
+  // the task was added.
+  struct Task {
+    Task(net_instaweb::Function* fun, ThreadPoolExecutor* own)
+        : function(fun), owner(own) {}
+    net_instaweb::Function* function;
+    ThreadPoolExecutor* owner;
+  };
+
+  typedef std::multimap<net::SpdyPriority, Task> TaskQueue;
+  typedef std::map<const ThreadPoolExecutor*, int> OwnerMap;
+
+  // Start a new worker thread if 1) the task queue is larger than the number
+  // of currently idle workers, and 2) we have fewer than the maximum number of
+  // workers.  Otherwise, do nothing.  Must be holding lock_ when calling this.
+  void StartNewWorkerIfNeeded();
+
+  // Join and delete all worker threads in the given set.  This will block
+  // until all the threads have terminated and been cleaned up, so don't call
+  // this while holding the lock_.
+  static void JoinThreads(const std::set<WorkerThread*>& threads);
+
+  // These calls are used to implement the WorkerThread's main function.  Must
+  // be holding lock_ when calling any of these.
+  bool TryZombifyIdleThread(WorkerThread* thread);
+  Task GetNextTask();
+  void OnTaskComplete(Task task);
+
+  // The min and max number of threads passed to the constructor.  Although the
+  // constructor takes signed ints (for convenience), we store these unsigned
+  // to avoid the need for static_casts when comparing against workers_.size().
+  const unsigned int min_threads_;
+  const unsigned int max_threads_;
+  const base::TimeDelta max_thread_idle_time_;
+  // This single master lock protects all of the below fields, as well as any
+  // mutable data and condition variables in the worker threads and executors.
+  // Having just one lock makes everything much easier to understand.
+  base::Lock lock_;
+  // Workers wait on this condvar when waiting for a new task.  We signal it
+  // when a new task becomes available, or when we need to shut down.
+  base::ConditionVariable worker_condvar_;
+  // The list of running worker threads.  We keep this around so that we can
+  // join the threads on shutdown.
+  std::set<WorkerThread*> workers_;
+  // Worker threads that have shut themselves down (due to being idle), and are
+  // awaiting cleanup by the master thread.
+  std::set<WorkerThread*> zombies_;
+  // How many workers do we have that are actually executing tasks?
+  unsigned int num_busy_workers_;
+  // We set this to true to tell the worker threads to terminate.
+  bool shutting_down_;
+  // The priority queue of pending tasks.  Invariant: all Function objects in
+  // the queue have neither been started nor cancelled yet.
+  TaskQueue task_queue_;
+  // This maps executors to the number of currently running tasks for that
+  // executor; we increment when we start a task, and decrement when we finish
+  // it.  If the number is zero, we remove the entry from the map; thus, as an
+  // invariant the map only contains entries for executors with active tasks.
+  OwnerMap active_task_counts_;
+
+  DISALLOW_COPY_AND_ASSIGN(ThreadPool);
+};
+
+}  // namespace mod_spdy
+
+#endif  // MOD_SPDY_COMMON_THREAD_POOL_H_

Propchange: httpd/mod_spdy/branches/httpd-2.2.x/mod_spdy/common/thread_pool.h
------------------------------------------------------------------------------
    svn:eol-style = native

Added: httpd/mod_spdy/branches/httpd-2.2.x/mod_spdy/common/thread_pool_test.cc
URL: http://svn.apache.org/viewvc/httpd/mod_spdy/branches/httpd-2.2.x/mod_spdy/common/thread_pool_test.cc?rev=1591620&view=auto
==============================================================================
--- httpd/mod_spdy/branches/httpd-2.2.x/mod_spdy/common/thread_pool_test.cc (added)
+++ httpd/mod_spdy/branches/httpd-2.2.x/mod_spdy/common/thread_pool_test.cc Thu May  1 11:39:27 2014
@@ -0,0 +1,339 @@
+// Copyright 2012 Google Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//      http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#include "mod_spdy/common/thread_pool.h"
+
+#include <vector>
+
+#include "base/basictypes.h"
+#include "base/memory/scoped_ptr.h"
+#include "base/synchronization/condition_variable.h"
+#include "base/synchronization/lock.h"
+#include "base/threading/platform_thread.h"
+#include "base/time/time.h"
+#include "mod_spdy/common/executor.h"
+#include "mod_spdy/common/testing/notification.h"
+#include "net/instaweb/util/public/function.h"
+#include "net/spdy/spdy_protocol.h"
+#include "testing/gtest/include/gtest/gtest.h"
+
+// When adding tests here, try to keep them robust against thread scheduling
+// differences from run to run.  In particular, they shouldn't fail just
+// because you're running under Valgrind.
+
+namespace {
+
+// When run, a TestFunction waits for `wait` millis, then sets `*result` to
+// RAN.  When cancelled, it sets *result to CANCELLED.
+class TestFunction : public net_instaweb::Function {
+ public:
+  enum Result { NOTHING, RAN, CANCELLED };
+  TestFunction(int wait, base::Lock* lock, Result* result)
+      : wait_(wait), lock_(lock), result_(result) {}
+  virtual ~TestFunction() {}
+ protected:
+  // net_instaweb::Function methods:
+  virtual void Run() {
+    base::PlatformThread::Sleep(base::TimeDelta::FromMilliseconds(wait_));
+    base::AutoLock autolock(*lock_);
+    *result_ = RAN;
+  }
+  virtual void Cancel() {
+    base::AutoLock autolock(*lock_);
+    *result_ = CANCELLED;
+  }
+ private:
+  const int wait_;
+  base::Lock* const lock_;
+  Result* const result_;
+  DISALLOW_COPY_AND_ASSIGN(TestFunction);
+};
+
+// Test that we execute tasks concurrently, that that we respect priorities
+// when pulling tasks from the queue.
+TEST(ThreadPoolTest, ConcurrencyAndPrioritization) {
+  // Create a thread pool with 2 threads, and an executor.
+  mod_spdy::ThreadPool thread_pool(2, 2);
+  ASSERT_TRUE(thread_pool.Start());
+  scoped_ptr<mod_spdy::Executor> executor(thread_pool.NewExecutor());
+
+  base::Lock lock;
+  TestFunction::Result result0 = TestFunction::NOTHING;
+  TestFunction::Result result1 = TestFunction::NOTHING;
+  TestFunction::Result result2 = TestFunction::NOTHING;
+  TestFunction::Result result3 = TestFunction::NOTHING;
+
+  // Create a high-priority TestFunction, which waits for 200 millis then
+  // records that it ran.
+  executor->AddTask(new TestFunction(200, &lock, &result0), 0);
+  // Create several TestFunctions at different priorities.  Each waits 100
+  // millis then records that it ran.
+  executor->AddTask(new TestFunction(100, &lock, &result1), 1);
+  executor->AddTask(new TestFunction(100, &lock, &result3), 3);
+  executor->AddTask(new TestFunction(100, &lock, &result2), 2);
+
+  // Wait 150 millis, then stop the executor.
+  base::PlatformThread::Sleep(base::TimeDelta::FromMilliseconds(150));
+  executor->Stop();
+
+  // Only TestFunctions that _started_ within the first 150 millis should have
+  // run; the others should have been cancelled.
+  //   - The priority-0 function should have started first, on the first
+  //     thread.  It finishes after 200 millis.
+  //   - The priority-1 function should run on the second thread.  It finishes
+  //     after 100 millis.
+  //   - The priority-2 function should run on the second thread after the
+  //     priority-1 function finishes, even though it was pushed last, because
+  //     it's higher-priority than the priority-3 function.  It finishes at the
+  //     200-milli mark.
+  //   - The priority-3 function should not get a chance to run, because we
+  //     stop the executor after 150 millis, and the soonest it could start is
+  //     the 200-milli mark.
+  base::AutoLock autolock(lock);
+  EXPECT_EQ(TestFunction::RAN, result0);
+  EXPECT_EQ(TestFunction::RAN, result1);
+  EXPECT_EQ(TestFunction::RAN, result2);
+  EXPECT_EQ(TestFunction::CANCELLED, result3);
+}
+
+// Test that stopping one executor doesn't affect tasks on another executor
+// from the same ThreadPool.
+TEST(ThreadPoolTest, MultipleExecutors) {
+  // Create a thread pool with 3 threads, and two executors.
+  mod_spdy::ThreadPool thread_pool(3, 3);
+  ASSERT_TRUE(thread_pool.Start());
+  scoped_ptr<mod_spdy::Executor> executor1(thread_pool.NewExecutor());
+  scoped_ptr<mod_spdy::Executor> executor2(thread_pool.NewExecutor());
+
+  base::Lock lock;
+  TestFunction::Result e1r1 = TestFunction::NOTHING;
+  TestFunction::Result e1r2 = TestFunction::NOTHING;
+  TestFunction::Result e1r3 = TestFunction::NOTHING;
+  TestFunction::Result e2r1 = TestFunction::NOTHING;
+  TestFunction::Result e2r2 = TestFunction::NOTHING;
+  TestFunction::Result e2r3 = TestFunction::NOTHING;
+
+  // Add some tasks to the executors.  Each one takes 50 millis to run.
+  executor1->AddTask(new TestFunction(50, &lock, &e1r1), 0);
+  executor2->AddTask(new TestFunction(50, &lock, &e2r1), 0);
+  executor1->AddTask(new TestFunction(50, &lock, &e1r2), 0);
+  executor2->AddTask(new TestFunction(50, &lock, &e2r2), 1);
+  executor1->AddTask(new TestFunction(50, &lock, &e1r3), 3);
+  executor2->AddTask(new TestFunction(50, &lock, &e2r3), 1);
+
+  // Wait 20 millis (to make sure the first few tasks got picked up), then
+  // destroy executor2, which should stop it.  Finally, sleep another 100
+  // millis to give the remaining tasks a chance to finish.
+  base::PlatformThread::Sleep(base::TimeDelta::FromMilliseconds(20));
+  executor2.reset();
+  base::PlatformThread::Sleep(base::TimeDelta::FromMilliseconds(100));
+
+  // The three high priority tasks should have all run.  The other two tasks on
+  // executor2 should have been cancelled when we stopped executor2, but the
+  // low-priority task on executor1 should have been left untouched, and
+  // allowed to finish.
+  base::AutoLock autolock(lock);
+  EXPECT_EQ(TestFunction::RAN, e1r1);
+  EXPECT_EQ(TestFunction::RAN, e2r1);
+  EXPECT_EQ(TestFunction::RAN, e1r2);
+  EXPECT_EQ(TestFunction::CANCELLED, e2r2);
+  EXPECT_EQ(TestFunction::RAN, e1r3);
+  EXPECT_EQ(TestFunction::CANCELLED, e2r3);
+}
+
+// When run, a WaitFunction blocks until the notification is set.
+class WaitFunction : public net_instaweb::Function {
+ public:
+  WaitFunction(mod_spdy::testing::Notification* notification)
+      : notification_(notification) {}
+  virtual ~WaitFunction() {}
+ protected:
+  // net_instaweb::Function methods:
+  virtual void Run() {
+    notification_->Wait();
+  }
+  virtual void Cancel() {}
+ private:
+  mod_spdy::testing::Notification* const notification_;
+  DISALLOW_COPY_AND_ASSIGN(WaitFunction);
+};
+
+// When run, an IdFunction pushes its ID onto the vector.
+class IdFunction : public net_instaweb::Function {
+ public:
+  IdFunction(int id, base::Lock* lock, base::ConditionVariable* condvar,
+             std::vector<int>* output)
+      : id_(id), lock_(lock), condvar_(condvar), output_(output) {}
+  virtual ~IdFunction() {}
+ protected:
+  // net_instaweb::Function methods:
+  virtual void Run() {
+    base::AutoLock autolock(*lock_);
+    output_->push_back(id_);
+    condvar_->Broadcast();
+  }
+  virtual void Cancel() {}
+ private:
+  const int id_;
+  base::Lock* const lock_;
+  base::ConditionVariable* const condvar_;
+  std::vector<int>* const output_;
+  DISALLOW_COPY_AND_ASSIGN(IdFunction);
+};
+
+// Test that if many tasks of the same priority are added, they are run in the
+// order they were added.
+TEST(ThreadPoolTest, SamePriorityTasksAreFIFO) {
+  // Create a thread pool with just one thread, and an executor.
+  mod_spdy::ThreadPool thread_pool(1, 1);
+  ASSERT_TRUE(thread_pool.Start());
+  scoped_ptr<mod_spdy::Executor> executor(thread_pool.NewExecutor());
+
+  // First, make sure no other tasks will get started until we set the
+  // notification.
+  mod_spdy::testing::Notification start;
+  executor->AddTask(new WaitFunction(&start), 0);
+
+  // Add many tasks to the executor, of varying priorities.
+  const int num_tasks_each_priority = 1000;
+  const int total_num_tasks = 3 * num_tasks_each_priority;
+  base::Lock lock;
+  base::ConditionVariable condvar(&lock);
+  std::vector<int> ids;  // protected by lock
+  for (int id = 0; id < num_tasks_each_priority; ++id) {
+    executor->AddTask(new IdFunction(id, &lock, &condvar, &ids), 1);
+    executor->AddTask(new IdFunction(id + num_tasks_each_priority,
+                                     &lock, &condvar, &ids), 2);
+    executor->AddTask(new IdFunction(id + 2 * num_tasks_each_priority,
+                                     &lock, &condvar, &ids), 3);
+  }
+
+  // Start us off, then wait for all tasks to finish.
+  start.Set();
+  base::AutoLock autolock(lock);
+  while (static_cast<int>(ids.size()) < total_num_tasks) {
+    condvar.Wait();
+  }
+
+  // Check that the tasks were executed in order by the one worker thread.
+  for (int index = 0; index < total_num_tasks; ++index) {
+    ASSERT_EQ(index, ids[index])
+        << "Task " << ids[index] << " finished in position " << index;
+  }
+}
+
+// Add a test failure if the thread pool does not stabilize to the expected
+// total/idle number of worker threads withing the given timeout.
+void ExpectWorkersWithinTimeout(int expected_num_workers,
+                                int expected_num_idle_workers,
+                                mod_spdy::ThreadPool* thread_pool,
+                                int timeout_millis) {
+  int millis_remaining = timeout_millis;
+  while (true) {
+    const int actual_num_workers = thread_pool->GetNumWorkersForTest();
+    const int actual_num_idle_workers =
+        thread_pool->GetNumIdleWorkersForTest();
+    if (actual_num_workers == expected_num_workers &&
+        actual_num_idle_workers == expected_num_idle_workers) {
+      return;
+    }
+    if (millis_remaining <= 0) {
+      ADD_FAILURE() << "Timed out; expected " << expected_num_workers
+                    << " worker(s) with " << expected_num_idle_workers
+                    <<" idle; still at " << actual_num_workers
+                    << " worker(s) with " << actual_num_idle_workers
+                    << " idle after " << timeout_millis << "ms";
+      return;
+    }
+    base::PlatformThread::Sleep(base::TimeDelta::FromMilliseconds(10));
+    millis_remaining -= 10;
+  }
+}
+
+// Test that we spawn new threads as needed, and allow them to die off after
+// being idle for a while.
+TEST(ThreadPoolTest, CreateAndRetireWorkers) {
+  // Create a thread pool with min_threads < max_threads, and give it a short
+  // max_thread_idle_time.
+  const int idle_time_millis = 100;
+  mod_spdy::ThreadPool thread_pool(
+      2, 4, base::TimeDelta::FromMilliseconds(idle_time_millis));
+  ASSERT_TRUE(thread_pool.Start());
+  // As soon as we start the thread pool, there should be the minimum number of
+  // workers (two), both counted as idle.
+  EXPECT_EQ(2, thread_pool.GetNumWorkersForTest());
+  EXPECT_EQ(2, thread_pool.GetNumIdleWorkersForTest());
+
+  scoped_ptr<mod_spdy::Executor> executor(thread_pool.NewExecutor());
+
+  // Start up three tasks.  That should push us up to three workers
+  // immediately.  If we make sure to give those threads a chance to run, they
+  // should soon pick up the tasks and all be busy.
+  mod_spdy::testing::Notification done1;
+  executor->AddTask(new WaitFunction(&done1), 0);
+  executor->AddTask(new WaitFunction(&done1), 1);
+  executor->AddTask(new WaitFunction(&done1), 2);
+  EXPECT_EQ(3, thread_pool.GetNumWorkersForTest());
+  ExpectWorkersWithinTimeout(3, 0, &thread_pool, 100);
+
+  // Add three more tasks.  We should now be at the maximum number of workers,
+  // and that fourth worker should be busy soon.
+  mod_spdy::testing::Notification done2;
+  executor->AddTask(new WaitFunction(&done2), 1);
+  executor->AddTask(new WaitFunction(&done2), 2);
+  executor->AddTask(new WaitFunction(&done2), 3);
+  EXPECT_EQ(4, thread_pool.GetNumWorkersForTest());
+  ExpectWorkersWithinTimeout(4, 0, &thread_pool, 100);
+
+  // Allow the first group of tasks to finish.  There are now only three tasks
+  // running, so one of our four threads should go idle.  If we wait for a
+  // while after that, that thread should terminate and enter zombie mode.
+  done1.Set();
+  ExpectWorkersWithinTimeout(4, 1, &thread_pool, idle_time_millis / 2);
+  ExpectWorkersWithinTimeout(3, 0, &thread_pool, 2 * idle_time_millis);
+  EXPECT_EQ(1, thread_pool.GetNumZombiesForTest());
+
+  // Allow the second group of tasks to finish.  There are no tasks left, so
+  // all three threads should go idle.  If we wait for a while after that,
+  // exactly one of the three should shut down, bringing us back down to the
+  // minimum number of threads.  We should now have two zombie threads.
+  done2.Set();
+  ExpectWorkersWithinTimeout(3, 3, &thread_pool, idle_time_millis / 2);
+  ExpectWorkersWithinTimeout(2, 2, &thread_pool, 2 * idle_time_millis);
+  EXPECT_EQ(2, thread_pool.GetNumZombiesForTest());
+
+  // Start some new new tasks.  This should cause us to immediately reap the
+  // zombie threads, and soon, we should have three busy threads.
+  mod_spdy::testing::Notification done3;
+  executor->AddTask(new WaitFunction(&done3), 0);
+  executor->AddTask(new WaitFunction(&done3), 2);
+  executor->AddTask(new WaitFunction(&done3), 1);
+  EXPECT_EQ(0, thread_pool.GetNumZombiesForTest());
+  EXPECT_EQ(3, thread_pool.GetNumWorkersForTest());
+  ExpectWorkersWithinTimeout(3, 0, &thread_pool, 100);
+
+  // Let those tasks finish.  Once again, the threads should go idle, and then
+  // one of them should terminate and enter zombie mode.
+  done3.Set();
+  ExpectWorkersWithinTimeout(3, 3, &thread_pool, idle_time_millis / 2);
+  ExpectWorkersWithinTimeout(2, 2, &thread_pool, 2 * idle_time_millis);
+  EXPECT_EQ(1, thread_pool.GetNumZombiesForTest());
+
+  // When we exit the test, the thread pool's destructor should reap the zombie
+  // thread (as well as shutting down the still-running workers).  We can
+  // verify this by running this test under valgrind and making sure that no
+  // memory is leaked.
+}
+
+}  // namespace

Added: httpd/mod_spdy/branches/httpd-2.2.x/mod_spdy/common/version.h.in
URL: http://svn.apache.org/viewvc/httpd/mod_spdy/branches/httpd-2.2.x/mod_spdy/common/version.h.in?rev=1591620&view=auto
==============================================================================
--- httpd/mod_spdy/branches/httpd-2.2.x/mod_spdy/common/version.h.in (added)
+++ httpd/mod_spdy/branches/httpd-2.2.x/mod_spdy/common/version.h.in Thu May  1 11:39:27 2014
@@ -0,0 +1,25 @@
+// Copyright (c) 2010 The Chromium Authors.  All rights reserved.
+// Use of this source is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+// version.h is generated from version.h.in.  Edit the source!
+
+#pragma once
+
+// Version Information
+
+#define MOD_SPDY_VERSION @MAJOR@,@MINOR@,@BUILD@,@PATCH@
+#define MOD_SPDY_VERSION_STRING "@MAJOR@.@MINOR@.@BUILD@.@PATCH@"
+
+// Branding Information
+
+#define COMPANY_FULLNAME_STRING "@COMPANY_FULLNAME@"
+#define COMPANY_SHORTNAME_STRING "@COMPANY_SHORTNAME@"
+#define PRODUCT_FULLNAME_STRING "@PRODUCT_FULLNAME@"
+#define PRODUCT_SHORTNAME_STRING "@PRODUCT_SHORTNAME@"
+#define COPYRIGHT_STRING "@COPYRIGHT@"
+#define OFFICIAL_BUILD_STRING "@OFFICIAL_BUILD@"
+
+// Changelist Information
+
+#define LASTCHANGE_STRING "@LASTCHANGE@"

Propchange: httpd/mod_spdy/branches/httpd-2.2.x/mod_spdy/common/version.h.in
------------------------------------------------------------------------------
    svn:eol-style = native



Mime
View raw message