httpd-cvs mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j..@apache.org
Subject svn commit: r1591622 [12/33] - in /httpd/mod_spdy/trunk: ./ base/ base/base.xcodeproj/ base/metrics/ build/ build/all.xcodeproj/ build/build_util.xcodeproj/ build/install.xcodeproj/ build/internal/ build/linux/ build/mac/ build/util/ build/win/ install...
Date Thu, 01 May 2014 11:43:45 GMT
Added: httpd/mod_spdy/trunk/mod_spdy/common/spdy_session_test.cc
URL: http://svn.apache.org/viewvc/httpd/mod_spdy/trunk/mod_spdy/common/spdy_session_test.cc?rev=1591622&view=auto
==============================================================================
--- httpd/mod_spdy/trunk/mod_spdy/common/spdy_session_test.cc (added)
+++ httpd/mod_spdy/trunk/mod_spdy/common/spdy_session_test.cc Thu May  1 11:43:36 2014
@@ -0,0 +1,981 @@
+// Copyright 2010 Google Inc. All Rights Reserved.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//      http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#include "mod_spdy/common/spdy_session.h"
+
+#include <list>
+#include <string>
+
+#include "base/basictypes.h"
+#include "base/logging.h"
+#include "base/memory/scoped_ptr.h"
+#include "mod_spdy/common/protocol_util.h"
+#include "mod_spdy/common/spdy_server_config.h"
+#include "mod_spdy/common/spdy_session_io.h"
+#include "mod_spdy/common/spdy_stream_task_factory.h"
+#include "mod_spdy/common/testing/spdy_frame_matchers.h"
+#include "mod_spdy/common/thread_pool.h"
+#include "net/instaweb/util/public/function.h"
+#include "net/spdy/buffered_spdy_framer.h"
+#include "net/spdy/spdy_framer.h"
+#include "net/spdy/spdy_protocol.h"
+#include "testing/gmock/include/gmock/gmock.h"
+#include "testing/gtest/include/gtest/gtest.h"
+
+using mod_spdy::testing::AssociatedStreamIdIs;
+using mod_spdy::testing::FlagFinIs;
+using mod_spdy::testing::FlagUnidirectionalIs;
+using mod_spdy::testing::IsControlFrameOfType;
+using mod_spdy::testing::IsDataFrameWith;
+using mod_spdy::testing::IsGoAway;
+using mod_spdy::testing::IsRstStream;
+using mod_spdy::testing::PriorityIs;
+using mod_spdy::testing::StreamIdIs;
+using testing::_;
+using testing::AllOf;
+using testing::AtLeast;
+using testing::DoAll;
+using testing::Eq;
+using testing::Invoke;
+using testing::InvokeWithoutArgs;
+using testing::NotNull;
+using testing::Property;
+using testing::Return;
+using testing::WithArg;
+
+namespace {
+
+void AddRequiredHeaders(net::SpdyHeaderBlock* headers) {
+  (*headers)["host"] = "www.example.com";
+  (*headers)["method"] = "GET";
+  (*headers)["scheme"] = "https";
+  (*headers)["url"] = "/foo/index.html";
+  (*headers)["version"] = "HTTP/1.1";
+}
+
+class MockSpdySessionIO : public mod_spdy::SpdySessionIO {
+ public:
+  MOCK_METHOD0(IsConnectionAborted, bool());
+  MOCK_METHOD2(ProcessAvailableInput,
+               ReadStatus(bool, net::BufferedSpdyFramer*));
+  MOCK_METHOD1(SendFrameRaw, WriteStatus(const net::SpdyFrame&));
+};
+
+class MockSpdyStreamTaskFactory : public mod_spdy::SpdyStreamTaskFactory {
+ public:
+  MOCK_METHOD1(NewStreamTask, net_instaweb::Function*(mod_spdy::SpdyStream*));
+};
+
+class MockStreamTask : public net_instaweb::Function {
+ public:
+  MockStreamTask() : stream(NULL) {}
+  MOCK_METHOD0(Run, void());
+  MOCK_METHOD0(Cancel, void());
+  mod_spdy::SpdyStream* stream;
+ private:
+  DISALLOW_COPY_AND_ASSIGN(MockStreamTask);
+};
+
+// gMock action to be used with NewStreamTask.
+ACTION_P(ReturnMockTask, task) {
+  task->stream = arg0;
+  return task;
+}
+
+ACTION_P5(StartServerPush, session, stream_id, pri, url, expected_status) {
+  net::SpdyHeaderBlock push_headers;
+  push_headers[":host"] = "www.example.com";
+  push_headers[":method"] = "GET";
+  push_headers[":path"] = url;
+  push_headers[":scheme"] = "https";
+  push_headers[":version"] = "HTTP/1.1";
+  EXPECT_EQ(expected_status,
+            session->StartServerPush(stream_id, 0, pri, push_headers));
+}
+
+ACTION_P(SendResponseHeaders, task) {
+  net::SpdyHeaderBlock headers;
+  const bool spdy2 =
+      task->stream->spdy_version() < mod_spdy::spdy::SPDY_VERSION_3;
+  headers[spdy2 ? mod_spdy::spdy::kSpdy2Status :
+          mod_spdy::spdy::kSpdy3Status] = "200";
+  headers[spdy2 ? mod_spdy::spdy::kSpdy2Version :
+          mod_spdy::spdy::kSpdy3Version] = "HTTP/1.1";
+  if (task->stream->is_server_push()) {
+    task->stream->SendOutputHeaders(headers, false);
+  } else {
+    task->stream->SendOutputSynReply(headers, false);
+  }
+}
+
+ACTION_P3(SendDataFrame, task, data, fin) {
+  task->stream->SendOutputDataFrame(data, fin);
+}
+
+// TODO(mdsteele): Convert other tests to use MockStreamTask instead of
+//   FakeStreamTask; this should make the tests easier to read and more
+//   flexible.
+class FakeStreamTask : public net_instaweb::Function {
+ public:
+  virtual ~FakeStreamTask() {}
+  static FakeStreamTask* SimpleResponse(mod_spdy::SpdyStream* stream) {
+    return new FakeStreamTask(NULL, stream);
+  }
+  static FakeStreamTask* ResponseWithServerPush(
+      mod_spdy::SpdySession* session, mod_spdy::SpdyStream* stream) {
+    return new FakeStreamTask(session, stream);
+  }
+
+ protected:
+  // net_instaweb::Function methods:
+  virtual void Run();
+  virtual void Cancel() {}
+
+ private:
+  FakeStreamTask(mod_spdy::SpdySession* session, mod_spdy::SpdyStream* stream)
+      : session_(session), stream_(stream) {}
+
+  mod_spdy::SpdySession* const session_;
+  mod_spdy::SpdyStream* const stream_;
+
+  DISALLOW_COPY_AND_ASSIGN(FakeStreamTask);
+};
+
+void FakeStreamTask::Run() {
+  {
+    net::SpdyFrame* raw_frame;
+    ASSERT_TRUE(stream_->GetInputFrame(false, &raw_frame));
+    scoped_ptr<net::SpdyFrame> frame(raw_frame);
+    ASSERT_TRUE(frame != NULL);
+    EXPECT_THAT(*frame, AllOf(IsControlFrameOfType(net::SYN_STREAM),
+                              StreamIdIs(stream_->stream_id()),
+                              FlagUnidirectionalIs(false)));
+  }
+
+  {
+    net::SpdyHeaderBlock headers;
+    bool spdy2 = stream_->spdy_version() < mod_spdy::spdy::SPDY_VERSION_3;
+    headers[spdy2 ? mod_spdy::spdy::kSpdy2Status :
+            mod_spdy::spdy::kSpdy3Status] = "200";
+    headers[spdy2 ? mod_spdy::spdy::kSpdy2Version :
+            mod_spdy::spdy::kSpdy3Version] = "HTTP/1.1";
+    if (stream_->is_server_push()) {
+      stream_->SendOutputHeaders(headers, false);
+    } else {
+      stream_->SendOutputSynReply(headers, false);
+    }
+  }
+
+  if (session_ != NULL) {
+    const net::SpdyPriority priority = 3;
+    net::SpdyHeaderBlock push_headers;
+    push_headers[":host"] = "www.example.com";
+    push_headers[":method"] = "GET";
+    push_headers[":path"] = "/styles/main.css";
+    push_headers[":scheme"] = "https";
+    push_headers[":version"] = "HTTP/1.1";
+    session_->StartServerPush(stream_->stream_id(), 0, priority, push_headers);
+  }
+
+  stream_->SendOutputDataFrame("foobar", false);
+  stream_->SendOutputDataFrame("quux", true);
+}
+
+// An executor that runs all tasks in the same thread, either immediately when
+// they are added or when it is told to run them.
+class InlineExecutor : public mod_spdy::Executor {
+ public:
+  InlineExecutor() : run_on_add_(false), stopped_(false) {}
+  virtual ~InlineExecutor() { Stop(); }
+
+  virtual void AddTask(net_instaweb::Function* task,
+                       net::SpdyPriority priority) {
+    if (stopped_) {
+      task->CallCancel();
+    } else if (run_on_add_) {
+      task->CallRun();
+    } else {
+      tasks_.push_back(task);
+    }
+  }
+  virtual void Stop() {
+    stopped_ = true;
+    while (!tasks_.empty()) {
+      tasks_.front()->CallCancel();
+      tasks_.pop_front();
+    }
+  }
+  void RunOne() {
+    if (!tasks_.empty()) {
+      tasks_.front()->CallRun();
+      tasks_.pop_front();
+    }
+  }
+  void RunAll() {
+    while (!tasks_.empty()) {
+      RunOne();
+    }
+  }
+  void set_run_on_add(bool run) { run_on_add_ = run; }
+  bool stopped() const { return stopped_; }
+
+ private:
+  std::list<net_instaweb::Function*> tasks_;
+  bool run_on_add_;
+  bool stopped_;
+
+  DISALLOW_COPY_AND_ASSIGN(InlineExecutor);
+};
+
+// Base class for SpdySession tests.
+class SpdySessionTestBase :
+      public testing::TestWithParam<mod_spdy::spdy::SpdyVersion> {
+ public:
+  SpdySessionTestBase()
+      : spdy_version_(GetParam()),
+        framer_(mod_spdy::SpdyVersionToFramerVersion(spdy_version_)),
+        close_after_input_(false) {
+    ON_CALL(session_io_, IsConnectionAborted()).WillByDefault(Return(false));
+    ON_CALL(session_io_, ProcessAvailableInput(_, NotNull()))
+        .WillByDefault(Invoke(this, &SpdySessionTestBase::ReadNextInputChunk));
+    ON_CALL(session_io_, SendFrameRaw(_))
+        .WillByDefault(Return(mod_spdy::SpdySessionIO::WRITE_SUCCESS));
+  }
+
+  // Use as gMock action for ProcessAvailableInput:
+  //   Invoke(this, &SpdySessionTest::ReadNextInputChunk)
+  mod_spdy::SpdySessionIO::ReadStatus ReadNextInputChunk(
+      bool block, net::BufferedSpdyFramer* framer) {
+    if (input_queue_.empty()) {
+      return (!close_after_input_ ? mod_spdy::SpdySessionIO::READ_NO_DATA :
+              mod_spdy::SpdySessionIO::READ_CONNECTION_CLOSED);
+    }
+    const std::string chunk = input_queue_.front();
+    input_queue_.pop_front();
+    framer->ProcessInput(chunk.data(), chunk.size());
+    return (framer->HasError() ? mod_spdy::SpdySessionIO::READ_ERROR :
+            mod_spdy::SpdySessionIO::READ_SUCCESS);
+  }
+
+  // Use as gMock action for SendFrameRaw:
+  //   Invoke(this, &SpdySessionTest::ReceiveAndClose)
+  // This indicates that the simulated client is satisfied and will close the
+  // connection (once it is done sending input).
+  mod_spdy::SpdySessionIO::WriteStatus ReceiveAndClose(
+      const net::SpdyFrame& frame) {
+    close_after_input_ = true;
+    return mod_spdy::SpdySessionIO::WRITE_SUCCESS;
+  }
+
+  // Push a SETTINGS frame into the input queue.
+  void PushSettingsFrame(net::SpdySettingsIds setting, uint32 value) {
+    net::SettingsMap settings;
+    settings[setting] = std::make_pair(net::SETTINGS_FLAG_NONE, value);
+    scoped_ptr<net::SpdySettingsControlFrame> frame(
+        framer_.CreateSettings(settings));
+    PushFrame(*frame);
+  }
+
+ protected:
+  // Push some random garbage bytes into the input queue.
+  void PushGarbageData() {
+    input_queue_.push_back("\x88\x5f\x92\x02\xf8\x92\x12\xd1"
+                           "\x82\xdc\x1a\x40\xbb\xb2\x9d\x13");
+  }
+
+  // Push a frame into the input queue.
+  void PushFrame(const net::SpdyFrame& frame) {
+    input_queue_.push_back(mod_spdy::FrameData(frame).as_string());
+  }
+
+  // Push a PING frame into the input queue.
+  void PushPingFrame(uint32 id) {
+    scoped_ptr<net::SpdyPingControlFrame> frame(framer_.CreatePingFrame(id));
+    PushFrame(*frame);
+  }
+
+  // Push a valid SYN_STREAM frame into the input queue.
+  void PushSynStreamFrame(net::SpdyStreamId stream_id,
+                          net::SpdyPriority priority,
+                          net::SpdyControlFlags flags) {
+    net::SpdyHeaderBlock headers;
+    AddRequiredHeaders(&headers);
+    scoped_ptr<net::SpdySynStreamControlFrame> frame(framer_.CreateSynStream(
+        stream_id, 0, priority, 0, flags,
+        true,  // true = use compression
+        &headers));
+    PushFrame(*frame);
+  }
+
+  const mod_spdy::spdy::SpdyVersion spdy_version_;
+  net::SpdyFramer framer_;
+  mod_spdy::SpdyServerConfig config_;
+  MockSpdySessionIO session_io_;
+  MockSpdyStreamTaskFactory task_factory_;
+  std::list<std::string> input_queue_;
+  bool close_after_input_;
+};
+
+ACTION_P3(ReceiveSettingsFrame, test, key, value) {
+  test->PushSettingsFrame(key, value);
+}
+
+// Class for most SpdySession tests; this uses an InlineExecutor, so that test
+// behavior is very predictable.
+class SpdySessionTest : public SpdySessionTestBase {
+ public:
+  SpdySessionTest()
+      : session_(spdy_version_, &config_, &session_io_, &task_factory_,
+                 &executor_) {}
+
+  FakeStreamTask* ResponseWithServerPush(
+      mod_spdy::SpdyStream* stream) {
+    return FakeStreamTask::ResponseWithServerPush(&session_, stream);
+  }
+
+  // Use as gMock action for SendFrameRaw:
+  //   Invoke(this, &SpdySessionTest::ProcessOutputWithFlowControl)
+  mod_spdy::SpdySessionIO::WriteStatus ProcessOutputWithFlowControl(
+      const net::SpdyFrame& frame) {
+    // For SPDY v3 and above, send back a WINDOW_UPDATE frame saying we
+    // consumed the data frame.
+    if (session_.spdy_version() >= mod_spdy::spdy::SPDY_VERSION_3 &&
+        !frame.is_control_frame()) {
+      const net::SpdyDataFrame& data_frame =
+          *static_cast<const net::SpdyDataFrame*>(&frame);
+      scoped_ptr<net::SpdyWindowUpdateControlFrame> window_update_frame(
+          framer_.CreateWindowUpdate(data_frame.stream_id(),
+                                     data_frame.length()));
+      PushFrame(*window_update_frame);
+    }
+    return mod_spdy::SpdySessionIO::WRITE_SUCCESS;
+  }
+
+ protected:
+  InlineExecutor executor_;
+  mod_spdy::SpdySession session_;
+};
+
+// Test that if the connection is already closed, we stop immediately.
+TEST_P(SpdySessionTest, ConnectionAlreadyClosed) {
+  testing::InSequence seq;
+  EXPECT_CALL(session_io_, SendFrameRaw(IsControlFrameOfType(net::SETTINGS)))
+      .WillOnce(Return(mod_spdy::SpdySessionIO::WRITE_CONNECTION_CLOSED));
+
+  session_.Run();
+  EXPECT_TRUE(executor_.stopped());
+}
+
+// Test that when the connection is aborted, we stop.
+TEST_P(SpdySessionTest, ImmediateConnectionAbort) {
+  testing::InSequence seq;
+  EXPECT_CALL(session_io_, SendFrameRaw(IsControlFrameOfType(net::SETTINGS)));
+  EXPECT_CALL(session_io_, IsConnectionAborted()).WillOnce(Return(true));
+
+  session_.Run();
+  EXPECT_TRUE(executor_.stopped());
+}
+
+// Test responding to a PING frame from the client (followed by the connection
+// closing, so that we can exit the Run loop).
+TEST_P(SpdySessionTest, SinglePing) {
+  PushPingFrame(1);
+
+  testing::InSequence seq;
+  EXPECT_CALL(session_io_, SendFrameRaw(IsControlFrameOfType(net::SETTINGS)));
+  EXPECT_CALL(session_io_, IsConnectionAborted());
+  EXPECT_CALL(session_io_, ProcessAvailableInput(Eq(true), NotNull()));
+  EXPECT_CALL(session_io_, SendFrameRaw(IsControlFrameOfType(net::PING)))
+      .WillOnce(Invoke(this, &SpdySessionTestBase::ReceiveAndClose));
+  EXPECT_CALL(session_io_, IsConnectionAborted());
+  EXPECT_CALL(session_io_, ProcessAvailableInput(Eq(true), NotNull()));
+  EXPECT_CALL(session_io_, SendFrameRaw(IsGoAway(net::GOAWAY_OK)));
+
+  session_.Run();
+  EXPECT_TRUE(executor_.stopped());
+}
+
+// Test handling a single stream request.
+TEST_P(SpdySessionTest, SingleStream) {
+  executor_.set_run_on_add(true);
+  const net::SpdyStreamId stream_id = 1;
+  const net::SpdyPriority priority = 2;
+  PushSynStreamFrame(stream_id, priority, net::CONTROL_FLAG_FIN);
+
+  testing::InSequence seq;
+  EXPECT_CALL(session_io_, SendFrameRaw(IsControlFrameOfType(net::SETTINGS)));
+  EXPECT_CALL(session_io_, IsConnectionAborted());
+  EXPECT_CALL(session_io_, ProcessAvailableInput(Eq(true), NotNull()));
+  EXPECT_CALL(task_factory_, NewStreamTask(
+      AllOf(Property(&mod_spdy::SpdyStream::stream_id, Eq(stream_id)),
+            Property(&mod_spdy::SpdyStream::associated_stream_id, Eq(0u)),
+            Property(&mod_spdy::SpdyStream::priority, Eq(priority)))))
+      .WillOnce(WithArg<0>(Invoke(FakeStreamTask::SimpleResponse)));
+  EXPECT_CALL(session_io_, SendFrameRaw(
+      AllOf(IsControlFrameOfType(net::SYN_REPLY), StreamIdIs(stream_id),
+            FlagFinIs(false))));
+  EXPECT_CALL(session_io_, SendFrameRaw(
+      AllOf(IsDataFrameWith("foobar"), FlagFinIs(false))));
+  EXPECT_CALL(session_io_, SendFrameRaw(
+      AllOf(IsDataFrameWith("quux"), FlagFinIs(true))))
+      .WillOnce(Invoke(this, &SpdySessionTestBase::ReceiveAndClose));
+  EXPECT_CALL(session_io_, IsConnectionAborted());
+  EXPECT_CALL(session_io_, ProcessAvailableInput(Eq(true), NotNull()));
+  EXPECT_CALL(session_io_, SendFrameRaw(IsGoAway(net::GOAWAY_OK)));
+
+  session_.Run();
+  EXPECT_TRUE(executor_.stopped());
+}
+
+// Test that if SendFrameRaw fails, we immediately stop trying to send data and
+// shut down the session.
+TEST_P(SpdySessionTest, ShutDownSessionIfSendFrameRawFails) {
+  executor_.set_run_on_add(true);
+  const net::SpdyStreamId stream_id = 1;
+  const net::SpdyPriority priority = 2;
+  PushSynStreamFrame(stream_id, priority, net::CONTROL_FLAG_FIN);
+
+  testing::InSequence seq;
+  // We start out the same way as in the SingleStream test above.
+  EXPECT_CALL(session_io_, SendFrameRaw(IsControlFrameOfType(net::SETTINGS)));
+  EXPECT_CALL(session_io_, IsConnectionAborted());
+  EXPECT_CALL(session_io_, ProcessAvailableInput(_, _));
+  EXPECT_CALL(task_factory_, NewStreamTask(_))
+      .WillOnce(WithArg<0>(Invoke(FakeStreamTask::SimpleResponse)));
+  EXPECT_CALL(session_io_, SendFrameRaw(
+      AllOf(IsControlFrameOfType(net::SYN_REPLY), StreamIdIs(stream_id),
+            FlagFinIs(false))));
+  // At this point, the connection is closed by the client.
+  EXPECT_CALL(session_io_, SendFrameRaw(
+      AllOf(IsDataFrameWith("foobar"), FlagFinIs(false))))
+      .WillOnce(Return(mod_spdy::SpdySessionIO::WRITE_CONNECTION_CLOSED));
+  // Even though we have another frame to send at this point (already in the
+  // output queue), we immediately stop sending data and exit the session.
+
+  session_.Run();
+  EXPECT_TRUE(executor_.stopped());
+}
+
+// Test that when the client sends us garbage data, we send a GOAWAY frame and
+// then quit.
+TEST_P(SpdySessionTest, SendGoawayInResponseToGarbage) {
+  PushGarbageData();
+
+  testing::InSequence seq;
+  EXPECT_CALL(session_io_, SendFrameRaw(IsControlFrameOfType(net::SETTINGS)));
+  EXPECT_CALL(session_io_, IsConnectionAborted());
+  EXPECT_CALL(session_io_, ProcessAvailableInput(Eq(true), NotNull()));
+  EXPECT_CALL(session_io_, SendFrameRaw(IsGoAway(net::GOAWAY_PROTOCOL_ERROR)));
+
+  session_.Run();
+  EXPECT_TRUE(executor_.stopped());
+}
+
+// Test that when the client sends us a SYN_STREAM with a corrupted header
+// block, we send a GOAWAY frame and then quit.
+TEST_P(SpdySessionTest, SendGoawayForBadSynStreamCompression) {
+  net::SpdyHeaderBlock headers;
+  headers["foobar"] = "Foo is to bar as bar is to baz.";
+  scoped_ptr<net::SpdyFrame> frame(framer_.CreateSynStream(
+      1, 0, framer_.GetHighestPriority(), 0, net::CONTROL_FLAG_FIN,
+      false,  // false = no compression
+      &headers));
+  PushFrame(*frame);
+
+  testing::InSequence seq;
+  EXPECT_CALL(session_io_, SendFrameRaw(IsControlFrameOfType(net::SETTINGS)));
+  EXPECT_CALL(session_io_, IsConnectionAborted());
+  EXPECT_CALL(session_io_, ProcessAvailableInput(Eq(true), NotNull()));
+  EXPECT_CALL(session_io_, SendFrameRaw(IsGoAway(net::GOAWAY_PROTOCOL_ERROR)));
+
+  session_.Run();
+  EXPECT_TRUE(executor_.stopped());
+}
+
+// TODO(mdsteele): At the moment, SpdyFramer DCHECKs that the stream ID is
+// nonzero when decoding, so this test would crash in debug builds.  Once this
+// has been corrected in the Chromium code, we can remove this #ifdef.
+#ifdef NDEBUG
+// Test that when the client sends us a SYN_STREAM with a stream ID of 0, we
+// send a GOAWAY frame and then quit.
+TEST_P(SpdySessionTest, SendGoawayForSynStreamIdZero) {
+  net::SpdyHeaderBlock headers;
+  AddRequiredHeaders(&headers);
+  // SpdyFramer DCHECKS that the stream_id isn't zero, so just create the frame
+  // with a stream_id of 1, and then set the stream_id on the next line.
+  scoped_ptr<net::SpdySynStreamControlFrame> frame(framer_.CreateSynStream(
+      1, 0, framer_.GetHighestPriority(), 0, net::CONTROL_FLAG_FIN, true,
+      &headers));
+  frame->set_stream_id(0);
+  PushFrame(*frame);
+
+  testing::InSequence seq;
+  EXPECT_CALL(session_io_, SendFrameRaw(IsControlFrameOfType(net::SETTINGS)));
+  EXPECT_CALL(session_io_, IsConnectionAborted());
+  EXPECT_CALL(session_io_, ProcessAvailableInput(Eq(true), NotNull()));
+  EXPECT_CALL(session_io_, SendFrameRaw(IsGoAway(net::GOAWAY_PROTOCOL_ERROR)));
+
+  session_.Run();
+  EXPECT_TRUE(executor_.stopped());
+}
+#endif
+
+// TODO(mdsteele): At the moment, the SpdyFramer API has changed such that it
+// no longer gives us a chance to validate SYN_STREAM flags, so we can't pass
+// this test.  There is a TODO in the SpdyFramer code to validate flags, so
+// hopefully we can reenable this test in the future.
+#if 0
+// Test that when the client sends us a SYN_STREAM with invalid flags, we
+// send a GOAWAY frame and then quit.
+TEST_P(SpdySessionTest, SendGoawayForSynStreamWithInvalidFlags) {
+  net::SpdyHeaderBlock headers;
+  AddRequiredHeaders(&headers);
+  // SpdyFramer DCHECKS that the flags are valid, so just create the frame
+  // with no flags, and then set the flags on the next line.
+  scoped_ptr<net::SpdySynStreamControlFrame> frame(framer_.CreateSynStream(
+      1, 0, framer_.GetHighestPriority(), 0, net::CONTROL_FLAG_NONE, true,
+      &headers));
+  frame->set_flags(0x47);
+  PushFrame(*frame);
+
+  testing::InSequence seq;
+  EXPECT_CALL(session_io_, SendFrameRaw(IsControlFrameOfType(net::SETTINGS)));
+  EXPECT_CALL(session_io_, IsConnectionAborted());
+  EXPECT_CALL(session_io_, ProcessAvailableInput(Eq(true), NotNull()));
+  EXPECT_CALL(session_io_, SendFrameRaw(IsGoAway(net::GOAWAY_PROTOCOL_ERROR)));
+
+  session_.Run();
+  EXPECT_TRUE(executor_.stopped());
+}
+#endif
+
+// Test that when the client sends us two SYN_STREAMs with the same ID, we send
+// a GOAWAY frame (but still finish out the good stream before quitting).
+TEST_P(SpdySessionTest, SendGoawayForDuplicateStreamId) {
+  executor_.set_run_on_add(false);
+  const net::SpdyStreamId stream_id = 1;
+  const net::SpdyPriority priority = 2;
+  PushSynStreamFrame(stream_id, priority, net::CONTROL_FLAG_FIN);
+  PushSynStreamFrame(stream_id, priority, net::CONTROL_FLAG_FIN);
+
+  testing::InSequence seq;
+  EXPECT_CALL(session_io_, SendFrameRaw(IsControlFrameOfType(net::SETTINGS)));
+  EXPECT_CALL(session_io_, IsConnectionAborted());
+  // Get the first SYN_STREAM; it looks good, so create a new task (but because
+  // we set executor_.set_run_on_add(false) above, it doesn't execute yet).
+  EXPECT_CALL(session_io_, ProcessAvailableInput(Eq(true), NotNull()));
+  EXPECT_CALL(task_factory_, NewStreamTask(
+      AllOf(Property(&mod_spdy::SpdyStream::stream_id, Eq(stream_id)),
+            Property(&mod_spdy::SpdyStream::associated_stream_id, Eq(0u)),
+            Property(&mod_spdy::SpdyStream::priority, Eq(priority)))))
+      .WillOnce(WithArg<0>(Invoke(FakeStreamTask::SimpleResponse)));
+  EXPECT_CALL(session_io_, IsConnectionAborted());
+  // There's an active stream out, so ProcessAvailableInput should have false
+  // for the first argument (false = nonblocking read).  Here we get the second
+  // SYN_STREAM with the same stream ID, so we should send GOAWAY.
+  EXPECT_CALL(session_io_, ProcessAvailableInput(Eq(false), NotNull()));
+  EXPECT_CALL(session_io_, SendFrameRaw(IsGoAway(net::GOAWAY_PROTOCOL_ERROR)));
+  // At this point, tell the executor to run the task.
+  EXPECT_CALL(session_io_, IsConnectionAborted())
+      .WillOnce(DoAll(InvokeWithoutArgs(&executor_, &InlineExecutor::RunAll),
+                      Return(false)));
+  // The stream is no longer active, but there are pending frames to send, so
+  // we shouldn't block on input.
+  EXPECT_CALL(session_io_, ProcessAvailableInput(Eq(false), NotNull()));
+  // Now we should send the output.
+  EXPECT_CALL(session_io_, SendFrameRaw(
+      AllOf(IsControlFrameOfType(net::SYN_REPLY), StreamIdIs(stream_id),
+            FlagFinIs(false))));
+  EXPECT_CALL(session_io_, SendFrameRaw(
+      AllOf(IsDataFrameWith("foobar"), FlagFinIs(false))));
+  EXPECT_CALL(session_io_, SendFrameRaw(
+      AllOf(IsDataFrameWith("quux"), FlagFinIs(true))));
+  // Finally, there is no more output to send, and no chance of creating new
+  // streams (since we GOAWAY'd), so we quit.
+  EXPECT_CALL(session_io_, IsConnectionAborted());
+
+  session_.Run();
+  EXPECT_TRUE(executor_.stopped());
+}
+
+// Run each test over both SPDY v2 and SPDY v3.
+INSTANTIATE_TEST_CASE_P(Spdy2And3, SpdySessionTest, testing::Values(
+    mod_spdy::spdy::SPDY_VERSION_2, mod_spdy::spdy::SPDY_VERSION_3,
+    mod_spdy::spdy::SPDY_VERSION_3_1));
+
+// Create a type alias so that we can instantiate some of our
+// SpdySessionTest-based tests using a different set of parameters.
+typedef SpdySessionTest SpdySessionNoFlowControlTest;
+
+// Test that we send GOAWAY if the client tries to send
+// SETTINGS_INITIAL_WINDOW_SIZE over SPDY v2.
+TEST_P(SpdySessionNoFlowControlTest, SendGoawayForInitialWindowSize) {
+  net::SettingsMap settings;
+  settings[net::SETTINGS_INITIAL_WINDOW_SIZE] =
+      std::make_pair(net::SETTINGS_FLAG_NONE, 4000);
+  scoped_ptr<net::SpdyFrame> frame(framer_.CreateSettings(settings));
+  PushFrame(*frame);
+
+  testing::InSequence seq;
+  EXPECT_CALL(session_io_, SendFrameRaw(IsControlFrameOfType(net::SETTINGS)));
+  EXPECT_CALL(session_io_, IsConnectionAborted());
+  EXPECT_CALL(session_io_, ProcessAvailableInput(Eq(true), NotNull()));
+  EXPECT_CALL(session_io_, SendFrameRaw(IsGoAway(net::GOAWAY_PROTOCOL_ERROR)));
+
+  session_.Run();
+  EXPECT_TRUE(executor_.stopped());
+}
+
+// Only run no-flow-control tests for SPDY v2.
+INSTANTIATE_TEST_CASE_P(Spdy2, SpdySessionNoFlowControlTest, testing::Values(
+    mod_spdy::spdy::SPDY_VERSION_2));
+
+// Test class for flow-control tests.  This uses a ThreadPool Executor, so that
+// we can test concurrency behavior.
+class SpdySessionFlowControlTest : public SpdySessionTestBase {
+ public:
+  SpdySessionFlowControlTest()
+      : thread_pool_(1, 1) {
+    ON_CALL(session_io_, SendFrameRaw(_)).WillByDefault(Invoke(
+        this, &SpdySessionFlowControlTest::ProcessOutputWithFlowControl));
+  }
+
+  void SetUp() {
+    ASSERT_TRUE(thread_pool_.Start());
+    executor_.reset(thread_pool_.NewExecutor());
+    session_.reset(new mod_spdy::SpdySession(
+        spdy_version_, &config_, &session_io_, &task_factory_,
+        executor_.get()));
+  }
+
+  // Use as gMock action for SendFrameRaw:
+  //   Invoke(this, &SpdySessionTest::ProcessOutputWithFlowControl)
+  mod_spdy::SpdySessionIO::WriteStatus ProcessOutputWithFlowControl(
+      const net::SpdyFrame& frame) {
+    // For SPDY v3 and above, send back a WINDOW_UPDATE frame saying we
+    // consumed the data frame.
+    if (session_->spdy_version() >= mod_spdy::spdy::SPDY_VERSION_3 &&
+        !frame.is_control_frame()) {
+      const net::SpdyDataFrame& data_frame =
+          *static_cast<const net::SpdyDataFrame*>(&frame);
+      scoped_ptr<net::SpdyWindowUpdateControlFrame> window_update_frame(
+          framer_.CreateWindowUpdate(data_frame.stream_id(),
+                                     data_frame.length()));
+      PushFrame(*window_update_frame);
+    }
+    return mod_spdy::SpdySessionIO::WRITE_SUCCESS;
+  }
+
+ protected:
+  mod_spdy::ThreadPool thread_pool_;
+  scoped_ptr<mod_spdy::Executor> executor_;
+  scoped_ptr<mod_spdy::SpdySession> session_;
+};
+
+TEST_P(SpdySessionFlowControlTest, SingleStreamWithFlowControl) {
+  // Start by setting the initial window size to very small (three bytes).
+  PushSettingsFrame(net::SETTINGS_INITIAL_WINDOW_SIZE, 3);
+  // Then send a SYN_STREAM.
+  const net::SpdyStreamId stream_id = 1;
+  const net::SpdyPriority priority = 2;
+  PushSynStreamFrame(stream_id, priority, net::CONTROL_FLAG_FIN);
+
+  // We'll have to go through the loop at least six times (once for each of
+  // five frames that we _must_ receive (SETTINGS, SYN_STREAM, and three
+  // WINDOW_UDPATEs), and once to determine that the connection is closed).
+  EXPECT_CALL(session_io_, IsConnectionAborted()).Times(AtLeast(6));
+  EXPECT_CALL(session_io_, ProcessAvailableInput(_, NotNull()))
+      .Times(AtLeast(6));
+
+  // The rest of these will have to happen in a fixed order.
+  testing::Sequence s1;
+  EXPECT_CALL(session_io_, SendFrameRaw(IsControlFrameOfType(net::SETTINGS)))
+      .InSequence(s1);
+  EXPECT_CALL(task_factory_, NewStreamTask(
+      AllOf(Property(&mod_spdy::SpdyStream::stream_id, Eq(stream_id)),
+            Property(&mod_spdy::SpdyStream::associated_stream_id, Eq(0u)),
+            Property(&mod_spdy::SpdyStream::priority, Eq(priority)))))
+      .InSequence(s1)
+      .WillOnce(WithArg<0>(Invoke(FakeStreamTask::SimpleResponse)));
+  EXPECT_CALL(session_io_, SendFrameRaw(
+      AllOf(IsControlFrameOfType(net::SYN_REPLY), StreamIdIs(stream_id),
+            FlagFinIs(false))))
+      .InSequence(s1);
+  // Since the window size is just three bytes, we can only send three bytes at
+  // a time.
+  EXPECT_CALL(session_io_, SendFrameRaw(
+      AllOf(IsDataFrameWith("foo"), FlagFinIs(false)))).InSequence(s1);
+  EXPECT_CALL(session_io_, SendFrameRaw(
+      AllOf(IsDataFrameWith("bar"), FlagFinIs(false)))).InSequence(s1);
+  EXPECT_CALL(session_io_, SendFrameRaw(
+      AllOf(IsDataFrameWith("quu"), FlagFinIs(false)))).InSequence(s1);
+  EXPECT_CALL(session_io_, SendFrameRaw(
+      AllOf(IsDataFrameWith("x"), FlagFinIs(true)))).InSequence(s1)
+      .WillOnce(Invoke(this, &SpdySessionTestBase::ReceiveAndClose));
+  EXPECT_CALL(session_io_, SendFrameRaw(IsGoAway(net::GOAWAY_OK)));
+
+  session_->Run();
+}
+
+// Suppose the input side of the connection closes while we're blocked on flow
+// control; we should abort the blocked streams.
+TEST_P(SpdySessionFlowControlTest, CeaseInputWithFlowControl) {
+  // For this test, we will not be sending WINDOW_UDPATE frames.
+  ON_CALL(session_io_, SendFrameRaw(_))
+      .WillByDefault(Return(mod_spdy::SpdySessionIO::WRITE_SUCCESS));
+
+  // Start by setting the initial window size to very small (three bytes).
+  PushSettingsFrame(net::SETTINGS_INITIAL_WINDOW_SIZE, 3);
+  // Then send a SYN_STREAM.
+  const net::SpdyStreamId stream_id = 1;
+  const net::SpdyPriority priority = 2;
+  PushSynStreamFrame(stream_id, priority, net::CONTROL_FLAG_FIN);
+
+  EXPECT_CALL(session_io_, IsConnectionAborted()).Times(AtLeast(1));
+  EXPECT_CALL(session_io_, ProcessAvailableInput(_, NotNull()))
+      .Times(AtLeast(1));
+
+  // The rest of these will have to happen in a fixed order.
+  testing::Sequence s1;
+  EXPECT_CALL(session_io_, SendFrameRaw(IsControlFrameOfType(net::SETTINGS)))
+      .InSequence(s1);
+  EXPECT_CALL(task_factory_, NewStreamTask(
+      AllOf(Property(&mod_spdy::SpdyStream::stream_id, Eq(stream_id)),
+            Property(&mod_spdy::SpdyStream::associated_stream_id, Eq(0u)),
+            Property(&mod_spdy::SpdyStream::priority, Eq(priority)))))
+      .InSequence(s1)
+      .WillOnce(WithArg<0>(Invoke(FakeStreamTask::SimpleResponse)));
+  EXPECT_CALL(session_io_, SendFrameRaw(
+      AllOf(IsControlFrameOfType(net::SYN_REPLY), StreamIdIs(stream_id),
+            FlagFinIs(false))))
+      .InSequence(s1);
+  // Since the window size is just three bytes, we can only send three bytes at
+  // a time.  The stream thread will then be blocked.
+  EXPECT_CALL(session_io_, SendFrameRaw(
+      AllOf(IsDataFrameWith("foo"), FlagFinIs(false)))).InSequence(s1)
+      .WillOnce(Invoke(this, &SpdySessionTestBase::ReceiveAndClose));;
+  // At this point, we're blocked on flow control, and the test will close the
+  // input side of the connection.  Since the stream can never complete, the
+  // session should abort the stream and shut down, rather than staying blocked
+  // forever.
+  EXPECT_CALL(session_io_, SendFrameRaw(IsGoAway(net::GOAWAY_OK)))
+      .InSequence(s1);
+
+  session_->Run();
+}
+
+// Test that we send GOAWAY if the client tries to send
+// SETTINGS_INITIAL_WINDOW_SIZE with a value of 0.
+TEST_P(SpdySessionFlowControlTest, SendGoawayForTooSmallInitialWindowSize) {
+  net::SettingsMap settings;
+  settings[net::SETTINGS_INITIAL_WINDOW_SIZE] =
+      std::make_pair(net::SETTINGS_FLAG_NONE, 0);
+  scoped_ptr<net::SpdyFrame> frame(framer_.CreateSettings(settings));
+  PushFrame(*frame);
+
+  testing::InSequence seq;
+  EXPECT_CALL(session_io_, SendFrameRaw(IsControlFrameOfType(net::SETTINGS)));
+  EXPECT_CALL(session_io_, IsConnectionAborted());
+  EXPECT_CALL(session_io_, ProcessAvailableInput(Eq(true), NotNull()));
+  EXPECT_CALL(session_io_, SendFrameRaw(IsGoAway(net::GOAWAY_PROTOCOL_ERROR)));
+
+  session_->Run();
+}
+
+// Test that we send GOAWAY if the client tries to send
+// SETTINGS_INITIAL_WINDOW_SIZE with a value of 0x80000000.
+TEST_P(SpdySessionFlowControlTest, SendGoawayForTooLargeInitialWindowSize) {
+  net::SettingsMap settings;
+  settings[net::SETTINGS_INITIAL_WINDOW_SIZE] =
+      std::make_pair(net::SETTINGS_FLAG_NONE, 0x80000000);
+  scoped_ptr<net::SpdyFrame> frame(framer_.CreateSettings(settings));
+  PushFrame(*frame);
+
+  testing::InSequence seq;
+  EXPECT_CALL(session_io_, SendFrameRaw(IsControlFrameOfType(net::SETTINGS)));
+  EXPECT_CALL(session_io_, IsConnectionAborted());
+  EXPECT_CALL(session_io_, ProcessAvailableInput(Eq(true), NotNull()));
+  EXPECT_CALL(session_io_, SendFrameRaw(IsGoAway(net::GOAWAY_PROTOCOL_ERROR)));
+
+  session_->Run();
+}
+
+// Only run flow control tests for SPDY v3.
+INSTANTIATE_TEST_CASE_P(Spdy3, SpdySessionFlowControlTest, testing::Values(
+    mod_spdy::spdy::SPDY_VERSION_3, mod_spdy::spdy::SPDY_VERSION_3_1));
+
+// Create a type alias so that we can instantiate some of our
+// SpdySessionTest-based tests using a different set of parameters.
+typedef SpdySessionTest SpdySessionServerPushTest;
+
+TEST_P(SpdySessionServerPushTest, SimpleServerPush) {
+  executor_.set_run_on_add(true);
+  const net::SpdyStreamId stream_id = 3;
+  const net::SpdyPriority priority = 2;
+  PushSynStreamFrame(stream_id, priority, net::CONTROL_FLAG_FIN);
+
+  testing::InSequence seq;
+  EXPECT_CALL(session_io_, SendFrameRaw(IsControlFrameOfType(net::SETTINGS)));
+  EXPECT_CALL(session_io_, IsConnectionAborted());
+  EXPECT_CALL(session_io_, ProcessAvailableInput(Eq(true), NotNull()));
+  EXPECT_CALL(task_factory_, NewStreamTask(
+      AllOf(Property(&mod_spdy::SpdyStream::stream_id, Eq(stream_id)),
+            Property(&mod_spdy::SpdyStream::associated_stream_id, Eq(0u)),
+            Property(&mod_spdy::SpdyStream::priority, Eq(priority)))))
+      .WillOnce(WithArg<0>(Invoke(
+          this, &SpdySessionTest::ResponseWithServerPush)));
+  // We should right away create the server push task, and get the SYN_STREAM
+  // before any other frames from the original stream.
+  EXPECT_CALL(task_factory_, NewStreamTask(
+      AllOf(Property(&mod_spdy::SpdyStream::stream_id, Eq(2u)),
+            Property(&mod_spdy::SpdyStream::associated_stream_id,
+                     Eq(stream_id)),
+            Property(&mod_spdy::SpdyStream::priority, Eq(3u)))))
+      .WillOnce(WithArg<0>(Invoke(FakeStreamTask::SimpleResponse)));
+  EXPECT_CALL(session_io_, SendFrameRaw(
+      AllOf(IsControlFrameOfType(net::SYN_STREAM),
+            StreamIdIs(2u), AssociatedStreamIdIs(stream_id),
+            FlagFinIs(false), FlagUnidirectionalIs(true))));
+  // The pushed stream has a low priority, so the rest of the first stream
+  // should get sent before the rest of the pushed stream.
+  EXPECT_CALL(session_io_, SendFrameRaw(
+      AllOf(IsControlFrameOfType(net::SYN_REPLY), StreamIdIs(stream_id),
+            FlagFinIs(false))));
+  EXPECT_CALL(session_io_, SendFrameRaw(
+      AllOf(IsDataFrameWith("foobar"), StreamIdIs(stream_id),
+            FlagFinIs(false))));
+  EXPECT_CALL(session_io_, SendFrameRaw(
+      AllOf(IsDataFrameWith("quux"), StreamIdIs(stream_id), FlagFinIs(true))));
+  // Now we should get the rest of the pushed stream.
+  EXPECT_CALL(session_io_, SendFrameRaw(
+      AllOf(IsControlFrameOfType(net::HEADERS), StreamIdIs(2u),
+            FlagFinIs(false))));
+  EXPECT_CALL(session_io_, SendFrameRaw(
+      AllOf(IsDataFrameWith("foobar"), StreamIdIs(2u), FlagFinIs(false))));
+  EXPECT_CALL(session_io_, SendFrameRaw(
+      AllOf(IsDataFrameWith("quux"), StreamIdIs(2u), FlagFinIs(true))))
+      .WillOnce(Invoke(this, &SpdySessionTestBase::ReceiveAndClose));
+  // And, we're done.
+  EXPECT_CALL(session_io_, IsConnectionAborted());
+  EXPECT_CALL(session_io_, ProcessAvailableInput(Eq(true), NotNull()));
+  EXPECT_CALL(session_io_, SendFrameRaw(IsGoAway(net::GOAWAY_OK)));
+
+  session_.Run();
+  EXPECT_TRUE(executor_.stopped());
+}
+
+TEST_P(SpdySessionServerPushTest, TooManyConcurrentPushes) {
+  MockStreamTask* task1 = new MockStreamTask;
+  MockStreamTask* task2 = new MockStreamTask;
+  MockStreamTask* task3 = new MockStreamTask;
+  executor_.set_run_on_add(false);
+  const net::SpdyStreamId stream_id = 9;
+  const net::SpdyPriority priority = 0;
+  PushSettingsFrame(net::SETTINGS_MAX_CONCURRENT_STREAMS, 2);
+  PushSynStreamFrame(stream_id, priority, net::CONTROL_FLAG_FIN);
+
+  EXPECT_CALL(session_io_, IsConnectionAborted()).Times(AtLeast(3));
+  EXPECT_CALL(session_io_, ProcessAvailableInput(_, NotNull()))
+      .Times(AtLeast(3));
+
+  testing::InSequence seq;
+  EXPECT_CALL(session_io_, SendFrameRaw(IsControlFrameOfType(net::SETTINGS)));
+  EXPECT_CALL(task_factory_, NewStreamTask(
+      AllOf(Property(&mod_spdy::SpdyStream::stream_id, Eq(stream_id)),
+            Property(&mod_spdy::SpdyStream::associated_stream_id, Eq(0u)),
+            Property(&mod_spdy::SpdyStream::priority, Eq(priority)))))
+      .WillOnce(ReturnMockTask(task1));
+  EXPECT_CALL(session_io_, IsConnectionAborted())
+      .WillOnce(DoAll(InvokeWithoutArgs(&executor_, &InlineExecutor::RunOne),
+                      Return(false)));
+  EXPECT_CALL(*task1, Run()).WillOnce(DoAll(
+      StartServerPush(&session_, stream_id, 3u, "/foo.css",
+          mod_spdy::SpdyServerPushInterface::PUSH_STARTED),
+      StartServerPush(&session_, stream_id, 2u, "/bar.css",
+          mod_spdy::SpdyServerPushInterface::PUSH_STARTED),
+      StartServerPush(&session_, stream_id, 1u, "/baz.css",
+          mod_spdy::SpdyServerPushInterface::TOO_MANY_CONCURRENT_PUSHES),
+      SendResponseHeaders(task1), SendDataFrame(task1, "html", true)));
+  // Start the first two pushes.  The third push should fail due to too many
+  // concurrent pushes.
+  EXPECT_CALL(task_factory_, NewStreamTask(
+      AllOf(Property(&mod_spdy::SpdyStream::stream_id, Eq(2u)),
+            Property(&mod_spdy::SpdyStream::associated_stream_id,
+                     Eq(stream_id)),
+            Property(&mod_spdy::SpdyStream::priority, Eq(3u)))))
+      .WillOnce(ReturnMockTask(task2));
+  EXPECT_CALL(task_factory_, NewStreamTask(
+      AllOf(Property(&mod_spdy::SpdyStream::stream_id, Eq(4u)),
+            Property(&mod_spdy::SpdyStream::associated_stream_id,
+                     Eq(stream_id)),
+            Property(&mod_spdy::SpdyStream::priority, Eq(2u)))))
+      .WillOnce(ReturnMockTask(task3));
+  // Now we get the SYN_STREAMs for the pushed streams before anything else.
+  EXPECT_CALL(session_io_, SendFrameRaw(
+      AllOf(IsControlFrameOfType(net::SYN_STREAM),
+            StreamIdIs(2u), AssociatedStreamIdIs(stream_id), PriorityIs(3u),
+            FlagFinIs(false), FlagUnidirectionalIs(true))));
+  EXPECT_CALL(session_io_, SendFrameRaw(
+      AllOf(IsControlFrameOfType(net::SYN_STREAM),
+            StreamIdIs(4u), AssociatedStreamIdIs(stream_id), PriorityIs(2u),
+            FlagFinIs(false), FlagUnidirectionalIs(true))));
+  // We now send the frames from the original stream.
+  EXPECT_CALL(session_io_, SendFrameRaw(
+      AllOf(IsControlFrameOfType(net::SYN_REPLY), StreamIdIs(stream_id),
+            FlagFinIs(false))));
+  EXPECT_CALL(session_io_, SendFrameRaw(
+      AllOf(IsDataFrameWith("html"), StreamIdIs(stream_id),
+            FlagFinIs(true))));
+  // At this point, the client will change MAX_CONCURRENT_STREAMS to zero.  We
+  // shouldn't barf, even though we have more active push streams than the new
+  // maximum.
+  EXPECT_CALL(session_io_, IsConnectionAborted())
+      .WillOnce(DoAll(
+          ReceiveSettingsFrame(this, net::SETTINGS_MAX_CONCURRENT_STREAMS, 0u),
+          Return(false)));
+  // Now let's run the rest of the tasks.  One of them will try to start yet
+  // another server push, but that should fail because MAX_CONCURRENT_STREAMS
+  // is now zero.
+  EXPECT_CALL(session_io_, IsConnectionAborted())
+      .WillOnce(DoAll(InvokeWithoutArgs(&executor_, &InlineExecutor::RunAll),
+                      Return(false)));
+  EXPECT_CALL(*task2, Run()).WillOnce(DoAll(
+      SendResponseHeaders(task2), SendDataFrame(task2, "foo", true)));
+  EXPECT_CALL(*task3, Run()).WillOnce(DoAll(
+      StartServerPush(&session_, 4u, 3u, "/stuff.png",
+          mod_spdy::SpdyServerPushInterface::TOO_MANY_CONCURRENT_PUSHES),
+      SendResponseHeaders(task3), SendDataFrame(task3, "bar", true)));
+  // And now we get all those frames.  The "bar" stream's frames should come
+  // first, because that's a higher-priority stream.
+  EXPECT_CALL(session_io_, SendFrameRaw(
+      AllOf(IsControlFrameOfType(net::HEADERS), StreamIdIs(4u))));
+  EXPECT_CALL(session_io_, SendFrameRaw(
+      AllOf(IsDataFrameWith("bar"), StreamIdIs(4u), FlagFinIs(true))));
+  EXPECT_CALL(session_io_, SendFrameRaw(
+      AllOf(IsControlFrameOfType(net::HEADERS), StreamIdIs(2u))));
+  EXPECT_CALL(session_io_, SendFrameRaw(
+      AllOf(IsDataFrameWith("foo"), StreamIdIs(2u), FlagFinIs(true))))
+      .WillOnce(Invoke(this, &SpdySessionTestBase::ReceiveAndClose));
+  // And, we're done.
+  EXPECT_CALL(session_io_, SendFrameRaw(IsGoAway(net::GOAWAY_OK)));
+
+  session_.Run();
+  EXPECT_TRUE(executor_.stopped());
+}
+
+// Only run server push tests for SPDY v3.
+INSTANTIATE_TEST_CASE_P(Spdy3, SpdySessionServerPushTest, testing::Values(
+    mod_spdy::spdy::SPDY_VERSION_3, mod_spdy::spdy::SPDY_VERSION_3_1));
+
+}  // namespace

Added: httpd/mod_spdy/trunk/mod_spdy/common/spdy_stream.cc
URL: http://svn.apache.org/viewvc/httpd/mod_spdy/trunk/mod_spdy/common/spdy_stream.cc?rev=1591622&view=auto
==============================================================================
--- httpd/mod_spdy/trunk/mod_spdy/common/spdy_stream.cc (added)
+++ httpd/mod_spdy/trunk/mod_spdy/common/spdy_stream.cc Thu May  1 11:43:36 2014
@@ -0,0 +1,395 @@
+// Copyright 2010 Google Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//      http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#include "mod_spdy/common/spdy_stream.h"
+
+
+#include "base/logging.h"
+#include "base/memory/scoped_ptr.h"
+#include "base/synchronization/condition_variable.h"
+#include "base/synchronization/lock.h"
+#include "mod_spdy/common/protocol_util.h"
+#include "mod_spdy/common/spdy_frame_priority_queue.h"
+#include "mod_spdy/common/spdy_frame_queue.h"
+
+namespace {
+
+// The smallest WINDOW_UPDATE delta we're willing to send.  If the client sends
+// us less than this much data, we wait for more data before sending a
+// WINDOW_UPDATE frame (so that we don't end up sending lots of little ones).
+const size_t kMinWindowUpdateSize =
+    static_cast<size_t>(net::kSpdyStreamInitialWindowSize) / 8;
+
+}  // namespace
+
+namespace mod_spdy {
+
+SpdyStream::SpdyStream(spdy::SpdyVersion spdy_version,
+                       net::SpdyStreamId stream_id,
+                       net::SpdyStreamId associated_stream_id,
+                       int32 server_push_depth,
+                       net::SpdyPriority priority,
+                       int32 initial_output_window_size,
+                       SpdyFramePriorityQueue* output_queue,
+                       net::BufferedSpdyFramer* framer,
+                       SpdyServerPushInterface* pusher)
+    : spdy_version_(spdy_version),
+      stream_id_(stream_id),
+      associated_stream_id_(associated_stream_id),
+      server_push_depth_(server_push_depth),
+      priority_(priority),
+      output_queue_(output_queue),
+      framer_(framer),
+      pusher_(pusher),
+      condvar_(&lock_),
+      aborted_(false),
+      output_window_size_(initial_output_window_size),
+      // TODO(mdsteele): Make our initial input window size configurable (we
+      //   would send the chosen value to the client with a SETTINGS frame).
+      input_window_size_(net::kSpdyStreamInitialWindowSize),
+      input_bytes_consumed_(0) {
+  DCHECK_NE(spdy::SPDY_VERSION_NONE, spdy_version);
+  DCHECK(output_queue_);
+  DCHECK(framer_);
+  DCHECK(pusher_);
+  DCHECK_GT(output_window_size_, 0);
+  // In SPDY v2, priorities are in the range 0-3; in SPDY v3, they are 0-7.
+  DCHECK_GE(priority, 0u);
+  DCHECK_LE(priority, LowestSpdyPriorityForVersion(spdy_version));
+}
+
+SpdyStream::~SpdyStream() {}
+
+bool SpdyStream::is_server_push() const {
+  // By the SPDY spec, a stream has an even stream ID if and only if it was
+  // initiated by the server.
+  return stream_id_ % 2 == 0;
+}
+
+bool SpdyStream::is_aborted() const {
+  base::AutoLock autolock(lock_);
+  return aborted_;
+}
+
+void SpdyStream::AbortSilently() {
+  base::AutoLock autolock(lock_);
+  InternalAbortSilently();
+}
+
+void SpdyStream::AbortWithRstStream(net::SpdyStatusCodes status) {
+  base::AutoLock autolock(lock_);
+  InternalAbortWithRstStream(status);
+}
+
+int32 SpdyStream::current_input_window_size() const {
+  base::AutoLock autolock(lock_);
+  DCHECK_GE(spdy_version(), spdy::SPDY_VERSION_3);
+  return input_window_size_;
+}
+
+int32 SpdyStream::current_output_window_size() const {
+  base::AutoLock autolock(lock_);
+  DCHECK_GE(spdy_version(), spdy::SPDY_VERSION_3);
+  return output_window_size_;
+}
+
+void SpdyStream::OnInputDataConsumed(size_t size) {
+  // Sanity check: there is no input data to absorb for a server push stream,
+  // so we should only be getting called for client-initiated streams.
+  DCHECK(!is_server_push());
+
+  // Flow control only exists for SPDY v3 and up, so for SPDY v2 we don't need
+  // to bother tracking this.
+  if (spdy_version() < spdy::SPDY_VERSION_3) {
+    return;
+  }
+
+  // If the size arg is zero, this method should be a no-op, so just quit now.
+  if (size == 0) {
+    return;
+  }
+
+  base::AutoLock autolock(lock_);
+
+  // Don't bother with any of this if the stream has been aborted.
+  if (aborted_) {
+    return;
+  }
+
+  // Make sure the current input window size is sane.  Although there are
+  // provisions in the SPDY spec that allow the window size to be temporarily
+  // negative, or to go above its default initial size, with our current
+  // implementation that should never happen.  Once we make the initial input
+  // window size configurable, we may need to adjust or remove these checks.
+  DCHECK_GE(input_window_size_, 0);
+  DCHECK_LE(input_window_size_, net::kSpdyStreamInitialWindowSize);
+
+  // Add the newly consumed data to the total.  Assuming our caller is behaving
+  // well (even if the client isn't) -- that is, they are only consuming as
+  // much data as we have put into the input queue -- there should be no
+  // overflow here, and the new value should be at most the amount of
+  // un-WINDOW_UPDATE-ed data we've received.  The reason we can be sure of
+  // this is that PostInputFrame() refuses to put more data into the queue than
+  // the window size allows, and aborts the stream if the client tries.
+  input_bytes_consumed_ += size;
+  DCHECK_GE(input_bytes_consumed_, size);
+  DCHECK_LE(input_bytes_consumed_,
+            static_cast<size_t>(net::kSpdyStreamInitialWindowSize -
+                                input_window_size_));
+
+  // We don't want to send lots of little WINDOW_UPDATE frames (as that would
+  // waste bandwidth), so only bother sending one once it would have a
+  // reasonably large value.
+  // TODO(mdsteele): Consider also tracking whether we have received a FLAG_FIN
+  //   on this stream; once we've gotten FLAG_FIN, there will be no more data,
+  //   so we don't need to send any more WINDOW_UPDATE frames.
+  if (input_bytes_consumed_ < kMinWindowUpdateSize) {
+    return;
+  }
+
+  // The SPDY spec forbids sending WINDOW_UPDATE frames with a non-positive
+  // delta-window-size (SPDY draft 3 section 2.6.8).  But since we already
+  // checked above that size was positive, input_bytes_consumed_ should now be
+  // positive as well.
+  DCHECK_GT(input_bytes_consumed_, 0u);
+  // Make sure there won't be any overflow shenanigans.
+  COMPILE_ASSERT(sizeof(size_t) >= sizeof(net::kSpdyStreamMaximumWindowSize),
+                 size_t_is_at_least_32_bits);
+  DCHECK_LE(input_bytes_consumed_,
+            static_cast<size_t>(net::kSpdyStreamMaximumWindowSize));
+
+  // Send a WINDOW_UPDATE frame to the client and update our window size.
+  SendOutputFrame(framer_->CreateWindowUpdate(
+      stream_id_, static_cast<uint32>(input_bytes_consumed_)));
+  input_window_size_ += input_bytes_consumed_;
+  DCHECK_LE(input_window_size_, net::kSpdyStreamInitialWindowSize);
+  input_bytes_consumed_ = 0;
+}
+
+void SpdyStream::AdjustOutputWindowSize(int32 delta) {
+  base::AutoLock autolock(lock_);
+
+  // Flow control only exists for SPDY v3 and up.
+  DCHECK_GE(spdy_version(), spdy::SPDY_VERSION_3);
+
+  if (aborted_) {
+    return;
+  }
+
+  // Check for overflow; if it happens, abort the stream (which will wake up
+  // any blocked threads).  Note that although delta is usually positive, it
+  // can also be negative, so we check for both overflow and underflow.
+  const int64 new_size =
+      static_cast<int64>(output_window_size_) + static_cast<int64>(delta);
+  if (new_size > static_cast<int64>(net::kSpdyStreamMaximumWindowSize) ||
+      new_size < -static_cast<int64>(net::kSpdyStreamMaximumWindowSize)) {
+    InternalAbortWithRstStream(net::FLOW_CONTROL_ERROR);
+    return;
+  }
+
+  // Update the window size.
+  const int32 old_size = output_window_size_;
+  output_window_size_ = static_cast<int32>(new_size);
+
+  // If the window size is newly positive, wake up any blocked threads.
+  if (old_size <= 0 && output_window_size_ > 0) {
+    condvar_.Broadcast();
+  }
+}
+
+void SpdyStream::PostInputFrame(net::SpdyFrame* frame_ptr) {
+  base::AutoLock autolock(lock_);
+
+  // Take ownership of the frame, so it will get deleted if we return early.
+  scoped_ptr<net::SpdyFrame> frame(frame_ptr);
+
+  // Once a stream has been aborted, nothing more goes into the queue.
+  if (aborted_) {
+    return;
+  }
+
+  // If this is a data frame (and we're using SPDY v3 or above) we need to
+  // track flow control.
+  if (!frame->is_control_frame() && spdy_version() >= spdy::SPDY_VERSION_3) {
+    DCHECK_GE(input_window_size_, 0);
+    const int32 size = frame->length();
+    // If receiving this much data would overflow the window size, then abort
+    // the stream with a flow control error.
+    if (size > input_window_size_) {
+      LOG(WARNING) << "Client violated flow control by sending too much data "
+                   << "to stream " << stream_id_ << ".  Aborting stream.";
+      InternalAbortWithRstStream(net::FLOW_CONTROL_ERROR);
+      return;  // Quit without posting the frame to the queue.
+    }
+    // Otherwise, decrease the window size.  It will be increased again once
+    // the data has been comsumed (by OnInputDataConsumed()).
+    else {
+      input_window_size_ -= size;
+    }
+  }
+
+  // Now that we've decreased the window size as necessary, we can make the
+  // frame available for consumption by the stream thread.
+  input_queue_.Insert(frame.release());
+}
+
+bool SpdyStream::GetInputFrame(bool block, net::SpdyFrame** frame) {
+  return input_queue_.Pop(block, frame);
+}
+
+void SpdyStream::SendOutputSynStream(const net::SpdyHeaderBlock& headers,
+                                     bool flag_fin) {
+  DCHECK(is_server_push());
+  base::AutoLock autolock(lock_);
+  if (aborted_) {
+    return;
+  }
+
+  const net::SpdyControlFlags flags = static_cast<net::SpdyControlFlags>(
+      (flag_fin ? net::CONTROL_FLAG_FIN : net::CONTROL_FLAG_NONE) |
+      net::CONTROL_FLAG_UNIDIRECTIONAL);
+  // Don't compress the headers in the frame here; it will be compressed later
+  // by the master connection (which maintains the shared header compression
+  // state for all streams).  We need to send this SYN_STREAM right away,
+  // before any more frames on the associated stream are sent, to ensure that
+  // the pushed stream gets started while the associated stream is still open,
+  // so we insert this frame with kTopPriority.
+  output_queue_->Insert(
+      SpdyFramePriorityQueue::kTopPriority, framer_->CreateSynStream(
+          stream_id_, associated_stream_id_, priority_,
+          0,  // 0 = no credential slot
+          flags,
+          false,  // false = uncompressed
+          &headers));
+}
+
+void SpdyStream::SendOutputSynReply(const net::SpdyHeaderBlock& headers,
+                                    bool flag_fin) {
+  DCHECK(!is_server_push());
+  base::AutoLock autolock(lock_);
+  if (aborted_) {
+    return;
+  }
+
+  const net::SpdyControlFlags flags =
+      flag_fin ? net::CONTROL_FLAG_FIN : net::CONTROL_FLAG_NONE;
+  // Don't compress the headers in the frame here; it will be compressed later
+  // by the master connection (which maintains the shared header compression
+  // state for all streams).
+  SendOutputFrame(framer_->CreateSynReply(
+      stream_id_, flags,
+      false,  // false = uncompressed
+      &headers));
+}
+
+void SpdyStream::SendOutputHeaders(const net::SpdyHeaderBlock& headers,
+                                   bool flag_fin) {
+  base::AutoLock autolock(lock_);
+  if (aborted_) {
+    return;
+  }
+
+  const net::SpdyControlFlags flags =
+      flag_fin ? net::CONTROL_FLAG_FIN : net::CONTROL_FLAG_NONE;
+  // Don't compress the headers in the frame here; it will be compressed later
+  // by the master connection (which maintains the shared header compression
+  // state for all streams).
+  SendOutputFrame(framer_->CreateHeaders(
+      stream_id_, flags,
+      false,  // false = uncompressed
+      &headers));
+}
+
+void SpdyStream::SendOutputDataFrame(base::StringPiece data, bool flag_fin) {
+  base::AutoLock autolock(lock_);
+  if (aborted_) {
+    return;
+  }
+
+  // Flow control only exists for SPDY v3 and up; for SPDY v2, we can just send
+  // the data without regard to the window size.  Even with flow control, we
+  // can of course send empty DATA frames at will.
+  if (spdy_version() < spdy::SPDY_VERSION_3 || data.empty()) {
+    // Suppress empty DATA frames (unless we're setting FLAG_FIN).
+    if (!data.empty() || flag_fin) {
+      const net::SpdyDataFlags flags =
+          flag_fin ? net::DATA_FLAG_FIN : net::DATA_FLAG_NONE;
+      SendOutputFrame(framer_->CreateDataFrame(
+          stream_id_, data.data(), data.size(), flags));
+    }
+    return;
+  }
+
+  while (!data.empty()) {
+    // If the current window size is non-positive, we must wait to send data
+    // until the client increases it (or we abort).  Note that the window size
+    // can be negative if the client decreased the maximum window size (with a
+    // SETTINGS frame) after we already sent data (SPDY draft 3 section 2.6.8).
+    while (!aborted_ && output_window_size_ <= 0) {
+      condvar_.Wait();
+    }
+    if (aborted_) {
+      return;
+    }
+
+    // If the current window size is less than the amount of data we'd like to
+    // send, send a smaller data frame with the first part of the data, and
+    // then we'll sleep until the window size is increased before sending the
+    // rest.
+    DCHECK_GT(output_window_size_, 0);
+    const size_t length = std::min(
+        data.size(), static_cast<size_t>(output_window_size_));
+    const net::SpdyDataFlags flags =
+        flag_fin && length == data.size() ?
+        net::DATA_FLAG_FIN : net::DATA_FLAG_NONE;
+    SendOutputFrame(framer_->CreateDataFrame(
+        stream_id_, data.data(), length, flags));
+    data = data.substr(length);
+    output_window_size_ -= length;
+    DCHECK_GE(output_window_size_, 0);
+  }
+}
+
+SpdyServerPushInterface::PushStatus SpdyStream::StartServerPush(
+    net::SpdyPriority priority,
+    const net::SpdyHeaderBlock& request_headers) {
+  DCHECK_GE(spdy_version(), spdy::SPDY_VERSION_3);
+  return pusher_->StartServerPush(stream_id_, server_push_depth_ + 1, priority,
+                                  request_headers);
+}
+
+void SpdyStream::SendOutputFrame(net::SpdyFrame* frame) {
+  lock_.AssertAcquired();
+  DCHECK(!aborted_);
+  output_queue_->Insert(static_cast<int>(priority_), frame);
+}
+
+void SpdyStream::InternalAbortSilently() {
+  lock_.AssertAcquired();
+  input_queue_.Abort();
+  aborted_ = true;
+  condvar_.Broadcast();
+}
+
+void SpdyStream::InternalAbortWithRstStream(net::SpdyStatusCodes status) {
+  lock_.AssertAcquired();
+  output_queue_->Insert(SpdyFramePriorityQueue::kTopPriority,
+                        framer_->CreateRstStream(stream_id_, status));
+  // InternalAbortSilently will set aborted_ to true, which will prevent the
+  // stream thread from sending any more frames on this stream after the
+  // RST_STREAM.
+  InternalAbortSilently();
+}
+
+}  // namespace mod_spdy

Added: httpd/mod_spdy/trunk/mod_spdy/common/spdy_stream.h
URL: http://svn.apache.org/viewvc/httpd/mod_spdy/trunk/mod_spdy/common/spdy_stream.h?rev=1591622&view=auto
==============================================================================
--- httpd/mod_spdy/trunk/mod_spdy/common/spdy_stream.h (added)
+++ httpd/mod_spdy/trunk/mod_spdy/common/spdy_stream.h Thu May  1 11:43:36 2014
@@ -0,0 +1,192 @@
+// Copyright 2010 Google Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//      http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#ifndef MOD_SPDY_COMMON_SPDY_STREAM_H_
+#define MOD_SPDY_COMMON_SPDY_STREAM_H_
+
+#include "base/basictypes.h"
+#include "base/string_piece.h"
+#include "base/synchronization/condition_variable.h"
+#include "base/synchronization/lock.h"
+#include "net/spdy/buffered_spdy_framer.h"
+#include "net/spdy/spdy_protocol.h"
+#include "mod_spdy/common/protocol_util.h"
+#include "mod_spdy/common/spdy_frame_queue.h"
+#include "mod_spdy/common/spdy_server_push_interface.h"
+
+namespace mod_spdy {
+
+class SpdyFramePriorityQueue;
+
+// Represents one stream of a SPDY connection.  This class is used to
+// coordinate and pass SPDY frames between the SPDY-to-HTTP filter, the
+// HTTP-to-SPDY filter, and the master SPDY connection thread.  This class is
+// thread-safe, and in particular can be used concurrently by the stream thread
+// and the connection thread (although certain methods are meant to only ever
+// be called by one thread or the other; see the doc comments).
+class SpdyStream {
+ public:
+  // The SpdyStream object does *not* take ownership of any of these arguments.
+  // The BufferedSpdyFramer object is used only for creating uncompressed
+  // frames; its state will never by modified by the SpdyStream (unfortunately,
+  // however, we do need to call some non-const methods on it that don't
+  // actually mutate state, so we require a non-const pointer here).
+  SpdyStream(spdy::SpdyVersion spdy_version,
+             net::SpdyStreamId stream_id,
+             net::SpdyStreamId associated_stream_id_,
+             int32 server_push_depth,
+             net::SpdyPriority priority,
+             int32 initial_output_window_size,
+             SpdyFramePriorityQueue* output_queue,
+             net::BufferedSpdyFramer* framer,
+             SpdyServerPushInterface* pusher);
+  ~SpdyStream();
+
+  // What version of SPDY is being used for this connection?
+  spdy::SpdyVersion spdy_version() const { return spdy_version_; }
+
+  // Return true if this stream was initiated by the server, false if it was
+  // initiated by the client.
+  bool is_server_push() const;
+
+  // Get the ID for this SPDY stream.
+  net::SpdyStreamId stream_id() const { return stream_id_; }
+
+  // Get the ID for the SPDY stream with which this one is associated.  By the
+  // SPDY spec, if there is no associated stream, this will be zero.
+  net::SpdyStreamId associated_stream_id() const {
+    return associated_stream_id_;
+  }
+
+  // Get the current depth of the stream. 0 if this is a client initiated
+  // stream or associated_stream.depth+1 if this stream was created as
+  // a result of another stream.
+  int32 server_push_depth() const { return server_push_depth_; }
+
+  // Get the priority of this stream.
+  net::SpdyPriority priority() const { return priority_; }
+
+  // Return true if this stream has been aborted and should shut down.
+  bool is_aborted() const;
+
+  // Abort this stream.  This method returns immediately, and the thread
+  // running the stream will stop as soon as possible (if it is currently
+  // blocked on the window size, it will be woken up).
+  void AbortSilently();
+
+  // Same as AbortSilently, but also sends a RST_STREAM frame for this stream.
+  void AbortWithRstStream(net::SpdyStatusCodes status);
+
+  // What are the current window sizes for this stream?  These are mostly
+  // useful for debugging.  Requires that spdy_version() >= 3.
+  int32 current_input_window_size() const;
+  int32 current_output_window_size() const;
+
+  // This should be called by the stream thread for each chunk of input data
+  // that it consumes.  The SpdyStream object will take care of sending
+  // WINDOW_UPDATE frames as appropriate (automatically bunching up smaller,
+  // chunks to avoid sending too many frames, and of course not sending
+  // WINDOW_UPDATE frames for SPDY/2 connections).  The connection thread must
+  // not call this method.
+  void OnInputDataConsumed(size_t size);
+
+  // This should be called by the connection thread to adjust the window size,
+  // either due to receiving a WINDOW_UPDATE frame from the client, or from the
+  // client changing the initial window size with a SETTINGS frame.  The delta
+  // argument will usually be positive (WINDOW_UPDATE is always positive), but
+  // *can* be negative (if the client reduces the window size with SETTINGS).
+  //
+  // This method should *not* be called by the stream thread; the SpdyStream
+  // object will automatically take care of decreasing the window size for sent
+  // data.
+  void AdjustOutputWindowSize(int32 delta);
+
+  // Provide a SPDY frame sent from the client.  This is to be called from the
+  // master connection thread.  This method takes ownership of the frame
+  // object.
+  void PostInputFrame(net::SpdyFrame* frame);
+
+  // Get a SPDY frame from the client and return true, or return false if no
+  // frame is available.  If the block argument is true and no frame is
+  // currently available, block until a frame becomes available or the stream
+  // is aborted.  This is to be called from the stream thread.  The caller
+  // gains ownership of the provided frame.
+  bool GetInputFrame(bool block, net::SpdyFrame** frame);
+
+  // Send a SYN_STREAM frame to the client for this stream.  This may only be
+  // called if is_server_push() is true.
+  void SendOutputSynStream(const net::SpdyHeaderBlock& headers, bool flag_fin);
+
+  // Send a SYN_REPLY frame to the client for this stream.  This may only be
+  // called if is_server_push() is false.
+  void SendOutputSynReply(const net::SpdyHeaderBlock& headers, bool flag_fin);
+
+  // Send a HEADERS frame to the client for this stream.
+  void SendOutputHeaders(const net::SpdyHeaderBlock& headers, bool flag_fin);
+
+  // Send a SPDY data frame to the client on this stream.
+  void SendOutputDataFrame(base::StringPiece data, bool flag_fin);
+
+  // Initiate a SPDY server push associated with this stream, roughly by
+  // pretending that the client sent a SYN_STREAM with the given headers.  To
+  // repeat: the headers argument is _not_ the headers that the server will
+  // send to the client, but rather the headers to _pretend_ that the client
+  // sent to the server.  Requires that spdy_version() >= 3.
+  SpdyServerPushInterface::PushStatus StartServerPush(
+      net::SpdyPriority priority,
+      const net::SpdyHeaderBlock& request_headers);
+
+ private:
+  // Send a SPDY frame to the client.  This is to be called from the stream
+  // thread.  This method takes ownership of the frame object.  Must be holding
+  // lock_ to call this method.
+  void SendOutputFrame(net::SpdyFrame* frame);
+
+  // Aborts the input queue, sets aborted_, and wakes up threads waiting on
+  // condvar_.  Must be holding lock_ to call this method.
+  void InternalAbortSilently();
+
+  // Like InternalAbortSilently, but also sends a RST_STREAM frame for this
+  // stream.  Must be holding lock_ to call this method.
+  void InternalAbortWithRstStream(net::SpdyStatusCodes status);
+
+  // These fields are all either constant or thread-safe, and do not require
+  // additional synchronization.  In the case of framer_, we are careful to
+  // only call certain methods, in a thread-safe way (even though some of those
+  // methods are marked as non-const).
+  const spdy::SpdyVersion spdy_version_;
+  const net::SpdyStreamId stream_id_;
+  const net::SpdyStreamId associated_stream_id_;
+  const int32 server_push_depth_;
+  const net::SpdyPriority priority_;
+  SpdyFrameQueue input_queue_;
+  SpdyFramePriorityQueue* const output_queue_;
+  net::BufferedSpdyFramer* const framer_;  // we call only thread-safe methods
+  SpdyServerPushInterface* const pusher_;
+
+  // The lock protects the fields below.  The above fields do not require
+  // additional synchronization.
+  mutable base::Lock lock_;
+  base::ConditionVariable condvar_;
+  bool aborted_;
+  int32 output_window_size_;
+  int32 input_window_size_;
+  size_t input_bytes_consumed_;  // consumed since we last sent a WINDOW_UPDATE
+
+  DISALLOW_COPY_AND_ASSIGN(SpdyStream);
+};
+
+}  // namespace mod_spdy
+
+#endif  // MOD_SPDY_COMMON_SPDY_STREAM_H_

Propchange: httpd/mod_spdy/trunk/mod_spdy/common/spdy_stream.h
------------------------------------------------------------------------------
    svn:eol-style = native

Added: httpd/mod_spdy/trunk/mod_spdy/common/spdy_stream_task_factory.cc
URL: http://svn.apache.org/viewvc/httpd/mod_spdy/trunk/mod_spdy/common/spdy_stream_task_factory.cc?rev=1591622&view=auto
==============================================================================
--- httpd/mod_spdy/trunk/mod_spdy/common/spdy_stream_task_factory.cc (added)
+++ httpd/mod_spdy/trunk/mod_spdy/common/spdy_stream_task_factory.cc Thu May  1 11:43:36 2014
@@ -0,0 +1,23 @@
+// Copyright 2011 Google Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//      http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#include "mod_spdy/common/spdy_stream_task_factory.h"
+
+namespace mod_spdy {
+
+SpdyStreamTaskFactory::SpdyStreamTaskFactory() {}
+
+SpdyStreamTaskFactory::~SpdyStreamTaskFactory() {}
+
+}  // namespace mod_spdy

Added: httpd/mod_spdy/trunk/mod_spdy/common/spdy_stream_task_factory.h
URL: http://svn.apache.org/viewvc/httpd/mod_spdy/trunk/mod_spdy/common/spdy_stream_task_factory.h?rev=1591622&view=auto
==============================================================================
--- httpd/mod_spdy/trunk/mod_spdy/common/spdy_stream_task_factory.h (added)
+++ httpd/mod_spdy/trunk/mod_spdy/common/spdy_stream_task_factory.h Thu May  1 11:43:36 2014
@@ -0,0 +1,49 @@
+// Copyright 2011 Google Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//      http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#ifndef MOD_SPDY_COMMON_SPDY_STREAM_TASK_FACTORY_H_
+#define MOD_SPDY_COMMON_SPDY_STREAM_TASK_FACTORY_H_
+
+#include "base/basictypes.h"
+
+namespace net_instaweb { class Function; }
+
+namespace mod_spdy {
+
+class SpdyStream;
+
+// SpdyStreamTaskFactory is a helper interface for the SpdySession class.
+// The task factory generates tasks that take care of processing SPDY streams.
+// The task factory must not be deleted before all such tasks have been
+// disposed of (run or cancelled).
+class SpdyStreamTaskFactory {
+ public:
+  SpdyStreamTaskFactory();
+  virtual ~SpdyStreamTaskFactory();
+
+  // Create a new task to process the given stream.  Running the task should
+  // process the stream -- that is, pull frames off the stream's input queue
+  // and post frames to the stream's output queue -- and the task should not
+  // complete until the stream is completely finished.
+  //
+  // The implementation may assume that the factory will outlive the task.
+  virtual net_instaweb::Function* NewStreamTask(SpdyStream* stream) = 0;
+
+ private:
+  DISALLOW_COPY_AND_ASSIGN(SpdyStreamTaskFactory);
+};
+
+}  // namespace mod_spdy
+
+#endif  // MOD_SPDY_COMMON_SPDY_STREAM_TASK_FACTORY_H_

Propchange: httpd/mod_spdy/trunk/mod_spdy/common/spdy_stream_task_factory.h
------------------------------------------------------------------------------
    svn:eol-style = native



Mime
View raw message