couchdb-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rnew...@apache.org
Subject [12/33] rexi commit: updated refs/heads/master to bbf59a2
Date Thu, 28 Aug 2014 12:23:22 GMT
Implement new stream2 API

This embeds the stream_init/1 logic into the stream functions so that we
don't have to maintain the logic for inititalizing the stream for all
clients.

BugzId: 24635


Project: http://git-wip-us.apache.org/repos/asf/couchdb-rexi/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-rexi/commit/cae29fe1
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-rexi/tree/cae29fe1
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-rexi/diff/cae29fe1

Branch: refs/heads/master
Commit: cae29fe1926db6ec89645da63aee60766f14bd11
Parents: 7733957
Author: Paul J. Davis <paul.joseph.davis@gmail.com>
Authored: Mon Oct 28 16:03:07 2013 -0500
Committer: Robert Newson <rnewson@apache.org>
Committed: Wed Jul 23 17:58:32 2014 +0100

----------------------------------------------------------------------
 src/rexi.erl | 59 +++++++++++++++++++++++++++++++++++++++++++++++++++++++
 1 file changed, 59 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/couchdb-rexi/blob/cae29fe1/src/rexi.erl
----------------------------------------------------------------------
diff --git a/src/rexi.erl b/src/rexi.erl
index 20f582b..62f410b 100644
--- a/src/rexi.erl
+++ b/src/rexi.erl
@@ -18,6 +18,7 @@
 -export([stream_init/0, stream_init/1]).
 -export([stream_start/1, stream_cancel/1]).
 -export([stream/1, stream/2, stream/3, stream_ack/1, stream_ack/2]).
+-export([stream2/1, stream2/2, stream2/3, stream_last/1, stream_last/2]).
 
 -include_lib("rexi/include/rexi.hrl").
 
@@ -184,6 +185,43 @@ stream(Msg, Limit, Timeout) ->
         exit(timeout)
     end.
 
+%% @equiv stream2(Msg, 10, 300000)
+stream2(Msg) ->
+    stream2(Msg, 10, 300000).
+
+%% @equiv stream2(Msg, Limit, 300000)
+stream2(Msg, Limit) ->
+    stream2(Msg, Limit, 300000).
+
+%% @doc Stream a message back to the coordinator. It limits the
+%% number of unacked messsages to Limit and throws a timeout error
+%% if it doesn't receive an ack in Timeout milliseconds. This
+%% is a combination of the old stream_start and stream functions
+%% which automatically does the stream initialization logic.
+-spec stream2(any(), pos_integer(), pos_integer() | inifinity) -> any().
+stream2(Msg, Limit, Timeout) ->
+    maybe_init_stream(Timeout),
+    try maybe_wait(Limit, Timeout) of
+        {ok, Count} ->
+            put(rexi_unacked, Count+1),
+            {Caller, Ref} = get(rexi_from),
+            erlang:send(Caller, {Ref, self(), Msg}),
+            ok
+    catch throw:timeout ->
+        exit(timeout)
+    end.
+
+%% @equiv stream_last(Msg, 300000)
+stream_last(Msg) ->
+    stream_last(Msg, 300000).
+
+%% @doc Send the last message in a stream. This difference between
+%% this and stream is that it uses rexi:reply/1 which doesn't include
+%% the worker pid and doesn't wait for a response from the controller.
+stream_last(Msg, Timeout) ->
+    maybe_init_stream(Timeout),
+    rexi:reply(Msg).
+
 %% @equiv stream_ack(Client, 1)
 stream_ack(Client) ->
     erlang:send(Client, {rexi_ack, 1}).
@@ -196,6 +234,27 @@ stream_ack(Client, N) ->
 
 cast_msg(Msg) -> {'$gen_cast', Msg}.
 
+maybe_init_stream(Timeout) ->
+    case get(rexi_STREAM_INITED) of
+        true ->
+            ok;
+        _ ->
+            init_stream(Timeout)
+    end.
+
+init_stream(Timeout) ->
+    case sync_reply(rexi_STREAM_INIT, Timeout) of
+        rexi_STREAM_START ->
+            put(rexi_STREAM_INITED, true),
+            ok;
+        rexi_STREAM_CANCEL ->
+            exit(normal);
+        timeout ->
+            exit(timeout);
+        Else ->
+            exit({invalid_stream_message, Else})
+    end.
+
 maybe_wait(Limit, Timeout) ->
     case get(rexi_unacked) of
         undefined ->


Mime
View raw message