From commits-return-44327-archive-asf-public=cust-asf.ponee.io@qpid.apache.org Fri Mar 23 16:09:03 2018 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx-eu-01.ponee.io (Postfix) with SMTP id 9D6CC18076D for ; Fri, 23 Mar 2018 16:09:02 +0100 (CET) Received: (qmail 43936 invoked by uid 500); 23 Mar 2018 15:08:56 -0000 Mailing-List: contact commits-help@qpid.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@qpid.apache.org Delivered-To: mailing list commits@qpid.apache.org Received: (qmail 43810 invoked by uid 99); 23 Mar 2018 15:08:56 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 23 Mar 2018 15:08:56 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 54E8FF67A5; Fri, 23 Mar 2018 15:08:56 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: aconway@apache.org To: commits@qpid.apache.org Date: Fri, 23 Mar 2018 15:09:02 -0000 Message-Id: In-Reply-To: <27947eb959d8484f8df4a8cb818295d5@git.apache.org> References: <27947eb959d8484f8df4a8cb818295d5@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [8/8] qpid-proton git commit: PROTON-1778: [ruby] thread safe work_queue PROTON-1778: [ruby] thread safe work_queue Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/aa8d3727 Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/aa8d3727 Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/aa8d3727 Branch: refs/heads/master Commit: aa8d372723ec3fe8de0d16c165ed5944b65c40f0 Parents: 02f4955 Author: Alan Conway Authored: Thu Mar 22 17:19:45 2018 -0400 Committer: Alan Conway Committed: Fri Mar 23 11:08:27 2018 -0400 ---------------------------------------------------------------------- proton-c/bindings/ruby/lib/core/connection.rb | 4 ++ proton-c/bindings/ruby/lib/core/container.rb | 33 ++++++++---- proton-c/bindings/ruby/lib/core/endpoint.rb | 3 ++ proton-c/bindings/ruby/lib/core/work_queue.rb | 59 +++++++++++++++++++++ proton-c/bindings/ruby/lib/util/schedule.rb | 24 +++++++-- proton-c/bindings/ruby/tests/test_container.rb | 27 +++++++++- 6 files changed, 135 insertions(+), 15 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/aa8d3727/proton-c/bindings/ruby/lib/core/connection.rb ---------------------------------------------------------------------- diff --git a/proton-c/bindings/ruby/lib/core/connection.rb b/proton-c/bindings/ruby/lib/core/connection.rb index 327be10..c0d161e 100644 --- a/proton-c/bindings/ruby/lib/core/connection.rb +++ b/proton-c/bindings/ruby/lib/core/connection.rb @@ -288,6 +288,10 @@ module Qpid::Proton @link_prefix + "/" + (@link_count += 1).to_s(32) end + # @return [WorkQueue] work queue for code that should be run in the thread + # context for this connection + attr_reader :work_queue + protected def _local_condition http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/aa8d3727/proton-c/bindings/ruby/lib/core/container.rb ---------------------------------------------------------------------- diff --git a/proton-c/bindings/ruby/lib/core/container.rb b/proton-c/bindings/ruby/lib/core/container.rb index c581d34..78f8013 100644 --- a/proton-c/bindings/ruby/lib/core/container.rb +++ b/proton-c/bindings/ruby/lib/core/container.rb @@ -19,6 +19,7 @@ require 'thread' require 'set' require_relative 'listener' +require_relative 'work_queue' module Qpid::Proton public @@ -193,7 +194,7 @@ module Qpid::Proton raise StoppedError if @stopped end while task = @work.pop - run_one task + run_one(task, Time.now) end raise @panic if @panic ensure @@ -242,23 +243,36 @@ module Qpid::Proton # Schedule code to be executed after a delay. # @param delay [Numeric] delay in seconds, must be >= 0 # @yield [ ] the block is invoked with no parameters in a {#run} thread after +delay+ has elapsed - def schedule(delay, &block) - delay >= 0.0 or raise ArgumentError, "delay=#{delay} must be >= 0" - block_given? or raise ArgumentError, "no block given" + # @return [void] + # @raise [ThreadError] if +non_block+ is true and the operation would block + def schedule(delay, non_block=false, &block) not_stopped - @lock.synchronize { @active += 1 } if @schedule.add(Time.now + delay, &block) + @lock.synchronize { @active += 1 } if @schedule.add(Time.now + delay, non_block, &block) @wake.wake end private + def wake() @wake.wake; end + # Container driver applies options and adds container context to events class ConnectionTask < Qpid::Proton::HandlerDriver + include TimeCompare + def initialize container, io, opts, server=false super io, opts[:handler] transport.set_server if server transport.apply opts connection.apply opts + @work_queue = WorkQueue.new container + connection.instance_variable_set(:@work_queue, @work_queue) + end + def next_tick() earliest(super, @work_queue.send(:next_tick)); end + def process(now) @work_queue.send(:process, now); super(); end + + def dispatch # Intercept dispatch to close work_queue + super + @work_queue.send(:close) if read_closed? && write_closed? end end @@ -341,7 +355,7 @@ module Qpid::Proton end # Handle a single item from the @work queue, this is the heart of the #run loop. - def run_one(task) + def run_one(task, now) case task when :start @@ -359,10 +373,9 @@ module Qpid::Proton end end - now = Time.now timeout = ((next_tick > now) ? next_tick - now : 0) if next_tick r, w = IO.select(r, w, nil, timeout) - now = Time.now + now = Time.now unless timeout == 0 @wake.reset if r && r.delete(@wake) # selected is a Set to eliminate duplicates between r, w and next_tick due. @@ -387,7 +400,7 @@ module Qpid::Proton @work << :select # Enable next select when ConnectionTask then - maybe_panic { task.process } + maybe_panic { task.process now } rearm task when ListenTask then @@ -396,7 +409,7 @@ module Qpid::Proton rearm task when :schedule then - if maybe_panic { @schedule.process Time.now } + if maybe_panic { @schedule.process now } @lock.synchronize { @active -= 1; check_stop_lh } else @lock.synchronize { @schedule_working = false } http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/aa8d3727/proton-c/bindings/ruby/lib/core/endpoint.rb ---------------------------------------------------------------------- diff --git a/proton-c/bindings/ruby/lib/core/endpoint.rb b/proton-c/bindings/ruby/lib/core/endpoint.rb index 86d8efe..77372a0 100644 --- a/proton-c/bindings/ruby/lib/core/endpoint.rb +++ b/proton-c/bindings/ruby/lib/core/endpoint.rb @@ -67,6 +67,9 @@ module Qpid::Proton self.connection.transport end + # @return [WorkQueue] the work queue for work on this endpoint. + def work_queue() connection.work_queue; end + # @private # @return [Bool] true if {#state} has all the bits of `mask` set def check_state(mask) (self.state & mask) == mask; end http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/aa8d3727/proton-c/bindings/ruby/lib/core/work_queue.rb ---------------------------------------------------------------------- diff --git a/proton-c/bindings/ruby/lib/core/work_queue.rb b/proton-c/bindings/ruby/lib/core/work_queue.rb new file mode 100644 index 0000000..d08d883 --- /dev/null +++ b/proton-c/bindings/ruby/lib/core/work_queue.rb @@ -0,0 +1,59 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +module Qpid::Proton + + # A queue of work items to be executed, possibly in a different thread. + class WorkQueue + + # Add code to be executed by the WorkQueue immediately. + # @param non_block [Boolean] if true raise {ThreadError} if the operation would block. + # @yield [ ] the block will be invoked with no parameters in the {WorkQueue} context, + # which may be a different thread. + # @return [void] + # @raise [ThreadError] if +non_block+ is true and the operation would block + # @raise [EOFError] if the queue is closed and cannot accept more work + def add(non_block=false, &block) + @schedule.add(Time.at(0), non_block, &block) + @container.send :wake + end + + # Schedule work to be executed by the WorkQueue after a delay. + # Note that tasks scheduled after the WorkQueue closes will be silently dropped + # + # @param delay delay in seconds until the block is added to the queue. + # @param (see #add) + # @yield (see #add) + # @return [void] + # @raise (see #add) + def schedule(delay, non_block=false, &block) + @schedule.add(Time.now + delay, non_block, &block) + @container.send :wake + end + + private + + def initialize(container) + @schedule = Schedule.new + @container = container + end + + def close() @schedule.close; end + def process(now) @schedule.process(now); end + def next_tick() @schedule.next_tick; end + end +end http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/aa8d3727/proton-c/bindings/ruby/lib/util/schedule.rb ---------------------------------------------------------------------- diff --git a/proton-c/bindings/ruby/lib/util/schedule.rb b/proton-c/bindings/ruby/lib/util/schedule.rb index ef80c1b..f1bfb36 100644 --- a/proton-c/bindings/ruby/lib/util/schedule.rb +++ b/proton-c/bindings/ruby/lib/util/schedule.rb @@ -33,13 +33,23 @@ module Qpid::Proton include TimeCompare Item = Struct.new(:time, :proc) - def initialize() @lock = Mutex.new; @items = []; end + def initialize() + @lock = Mutex.new + @items = [] + @closed = false + end - def next_tick() @lock.synchronize { @items.empty? ? nil : @items.first.time } end + def next_tick() + @lock.synchronize { @items.first.time unless @items.empty? } + end # @return true if the Schedule was previously empty - def add(time, &proc) + # @raise EOFError if schedule is closed + # @raise ThreadError if +non_block+ and operation would block + def add(time, non_block=false, &proc) + # non_block ignored for now, but we may implement a bounded schedule in future. @lock.synchronize do + raise EOFError if @closed if at = (0...@items.size).bsearch { |i| @items[i].time > time } @items.insert(at, Item.new(time, proc)) else @@ -53,11 +63,17 @@ module Qpid::Proton def process(now) due = [] empty = @lock.synchronize do - due << @items.shift while !@items.empty? && before_eq(@items.first.time, now) + due << @items.shift while (!@items.empty? && before_eq(@items.first.time, now)) @items.empty? end due.each { |i| i.proc.call() } return empty && !due.empty? end + + # #add raises EOFError after #close. + # #process can still be called to drain the schedule. + def close() + @lock.synchronize { @closed = true } + end end end http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/aa8d3727/proton-c/bindings/ruby/tests/test_container.rb ---------------------------------------------------------------------- diff --git a/proton-c/bindings/ruby/tests/test_container.rb b/proton-c/bindings/ruby/tests/test_container.rb index c9f54cc..7f89205 100644 --- a/proton-c/bindings/ruby/tests/test_container.rb +++ b/proton-c/bindings/ruby/tests/test_container.rb @@ -336,7 +336,32 @@ class ContainerTest < MiniTest::Test delays.sort.each do |d| x = a.shift assert_equal d, x[0] - assert_in_delta start + d, x[1], 0.01, "#{d}" + assert_in_delta start + d, x[1], 0.01 end end + + def test_work_queue + cont = ServerContainer.new(__method__, {}, 1) + c = cont.connect(cont.url) + t = Thread.new { cont.run } + q = Queue.new + + start = Time.now + c.work_queue.schedule(0.02) { q << [3, Thread.current] } + c.work_queue.add { q << [1, Thread.current] } + c.work_queue.schedule(0.04) { q << [4, Thread.current] } + c.work_queue.add { q << [2, Thread.current] } + + assert_equal [1, t], q.pop + assert_equal [2, t], q.pop + assert_in_delta 0.0, Time.now - start, 0.01 + assert_equal [3, t], q.pop + assert_in_delta 0.02, Time.now - start, 0.01 + assert_equal [4, t], q.pop + assert_in_delta 0.04, Time.now - start, 0.01 + + c.work_queue.add { c.close } + t.join + assert_raises(EOFError) { c.work_queue.add { } } + end end --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org For additional commands, e-mail: commits-help@qpid.apache.org