couchdb-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From vatam...@apache.org
Subject [couchdb] branch prototype/fdb-layer updated: CouchDB background jobs
Date Mon, 08 Jul 2019 18:42:45 GMT
This is an automated email from the ASF dual-hosted git repository.

vatamane pushed a commit to branch prototype/fdb-layer
in repository https://gitbox.apache.org/repos/asf/couchdb.git


The following commit(s) were added to refs/heads/prototype/fdb-layer by this push:
     new 753f3a7  CouchDB background jobs
753f3a7 is described below

commit 753f3a77c7c9bb7b4b199352a28ced8d2abf8330
Author: Nick Vatamaniuc <vatamane@apache.org>
AuthorDate: Wed Jun 12 16:11:56 2019 -0400

    CouchDB background jobs
    
    RFC: https://github.com/apache/couchdb-documentation/pull/409
    
    Main API is in the `couch_jobs` module. Additional description of internals is
    in the README.md file.
---
 rebar.config.script                                |   1 +
 rel/overlay/etc/default.ini                        |  21 +
 rel/reltool.config                                 |   2 +
 src/couch_jobs/.gitignore                          |   4 +
 src/couch_jobs/README.md                           |  62 ++
 src/couch_jobs/rebar.config                        |  14 +
 src/couch_jobs/src/couch_jobs.app.src              |  31 +
 src/couch_jobs/src/couch_jobs.erl                  | 378 ++++++++++++
 src/couch_jobs/src/couch_jobs.hrl                  |  52 ++
 src/couch_jobs/src/couch_jobs_activity_monitor.erl | 133 ++++
 .../src/couch_jobs_activity_monitor_sup.erl        |  64 ++
 src/couch_jobs/src/couch_jobs_app.erl              |  26 +
 src/couch_jobs/src/couch_jobs_fdb.erl              | 679 +++++++++++++++++++++
 src/couch_jobs/src/couch_jobs_notifier.erl         | 285 +++++++++
 src/couch_jobs/src/couch_jobs_notifier_sup.erl     |  64 ++
 src/couch_jobs/src/couch_jobs_pending.erl          | 143 +++++
 src/couch_jobs/src/couch_jobs_server.erl           | 193 ++++++
 src/couch_jobs/src/couch_jobs_sup.erl              |  66 ++
 src/couch_jobs/src/couch_jobs_type_monitor.erl     |  84 +++
 src/couch_jobs/test/couch_jobs_tests.erl           | 606 ++++++++++++++++++
 20 files changed, 2908 insertions(+)

diff --git a/rebar.config.script b/rebar.config.script
index 3f3ef46..116c040 100644
--- a/rebar.config.script
+++ b/rebar.config.script
@@ -84,6 +84,7 @@ SubDirs = [
     "src/couch_tests",
     "src/ddoc_cache",
     "src/fabric",
+    "src/couch_jobs",
     "src/global_changes",
     "src/mango",
     "src/rexi",
diff --git a/rel/overlay/etc/default.ini b/rel/overlay/etc/default.ini
index 0d7ac6d..8fd2261 100644
--- a/rel/overlay/etc/default.ini
+++ b/rel/overlay/etc/default.ini
@@ -481,3 +481,24 @@ min_priority = 2.0
 
 [smoosh.ratio_views]
 min_priority = 2.0
+
+;[couch_jobs]
+;
+; Maximum jitter used when checking for active job timeouts
+;activity_monitor_max_jitter_msec = 10000
+;
+; Hold-off applied before notifying subscribers. Since active jobs can be
+; queried more effiently using a range read, increasing this value should make
+; notifications more performant, however, it would also increase notification
+; latency.
+;type_monitor_holdoff_msec = 50
+;
+; Timeout used when waiting for the job type notification watches. The default
+; value of "infinity" should work well in most cases.
+;type_monitor_timeout_msec = infinity
+;
+; How often to check for the presense of new job types.
+;type_check_period_msec = 15000
+;
+; Jitter applied when checking for new job types.
+;type_check_max_jitter_msec = 5000
diff --git a/rel/reltool.config b/rel/reltool.config
index 1051d2e..7b2159d 100644
--- a/rel/reltool.config
+++ b/rel/reltool.config
@@ -33,6 +33,7 @@
         config,
         couch,
         couch_epi,
+        couch_jobs,
         couch_index,
         couch_log,
         couch_mrview,
@@ -90,6 +91,7 @@
     {app, config, [{incl_cond, include}]},
     {app, couch, [{incl_cond, include}]},
     {app, couch_epi, [{incl_cond, include}]},
+    {app, couch_jobs, [{incl_cond, include}]},
     {app, couch_index, [{incl_cond, include}]},
     {app, couch_log, [{incl_cond, include}]},
     {app, couch_mrview, [{incl_cond, include}]},
diff --git a/src/couch_jobs/.gitignore b/src/couch_jobs/.gitignore
new file mode 100644
index 0000000..6ef4c52
--- /dev/null
+++ b/src/couch_jobs/.gitignore
@@ -0,0 +1,4 @@
+*.beam
+.eunit
+ebin/couch_jobs.app
+.DS_Store
\ No newline at end of file
diff --git a/src/couch_jobs/README.md b/src/couch_jobs/README.md
new file mode 100644
index 0000000..bc45d32
--- /dev/null
+++ b/src/couch_jobs/README.md
@@ -0,0 +1,62 @@
+CouchDB Jobs Application
+========================
+
+Run background jobs in CouchDB
+
+Design (RFC) discussion: https://github.com/apache/couchdb-documentation/pull/409/files
+
+This is a description of some of the modules:
+
+ * `couch_jobs`: The main API module. It contains functions for creating,
+   accepting, executing, and monitoring jobs. A common pattern in this module
+   is to get a jobs transaction object (named `JTx` throughout the code), then
+   start a transaction and call a bunch of functions from `couch_jobs_fdb` in
+   that transaction.
+
+ * `couch_jobs_fdb`: This is a layer that talks to FDB. There is a lot of tuple
+   packing and unpacking, reading ranges and also managing transaction objects.
+
+ * `couch_jobs_pending`: This module implements the pending jobs queue. These
+   functions could all go in `couch_jobs_fdb` but the implemention was fairly
+   self-contained, with its own private helper functions, so it made sense to
+   move to a separate module.
+
+ * `couch_jobs_activity_monitor`: Here is where the "activity monitor"
+   functionality is implemented. That's done with a `gen_server` instance
+   running for each type. This `gen_server` periodically check if there are
+   inactive jobs for its type, and if they are, it re-enqueues them. If the
+   timeout value changes, then it skips the pending check, until the new
+   timeout expires.
+
+ * `couch_jobs_activity_monitor_sup` : This is a simple one-for-one supervisor
+   to spawn `couch_jobs_activity_monitor` instances for each type.
+
+ * `couch_jobs_type_monitor` : This is a helper process meant to be
+   `spawn_link`-ed from a parent `gen_server`. It then monitors activity for a
+   particular job type. If any jobs of that type have an update it notifies the
+   parent process.
+
+ * `couch_jobs_notifier`: Is responsible for subscriptions. Just like
+   with activity monitor there is a `gen_server` instance running per
+   each type. It uses a linked `couch_jobs_type_monitor` process to wait for
+   any job updates. When an update notification arrives, it can efficiently
+   find out if any active jobs have been updated, by reading the `(?JOBS,
+   ?ACTIVITY, Type, Sequence)` range. That should account for the bulk of
+   changes. The jobs that are not active anymore, are queried individually.
+   Subscriptions are managed in an ordered set ETS table.
+
+ * `couch_jobs_notifier_sup`: A simple one-for-one supervisor to spawn
+   `couch_jobs_notifier` processes for each type.
+
+ * `couch_jobs_server`: This is a `gen_server` which keeps track of job
+   types. It then starts or stops activity monitors and notifiers for each
+   type. To do that it queries the ` (?JOBS, ?ACTIVITY_TIMEOUT)` periodically.
+
+ * `couch_jobs_sup`: This is the main application supervisor. The restart
+   strategy is `rest_for_one`, meaning that a when a child restarts, the
+   sibling following it will restart. One interesting entry there is the first
+   child which is used just to create an ETS table used by `couch_jobs_fdb` to
+   cache transaction object (`JTx` mentioned above). That child calls
+   `init_cache/0`, where it creates the ETS then returns with `ignore` so it
+   doesn't actually spawn a process. The ETS table will be owned by the
+   supervisor process.
diff --git a/src/couch_jobs/rebar.config b/src/couch_jobs/rebar.config
new file mode 100644
index 0000000..362c878
--- /dev/null
+++ b/src/couch_jobs/rebar.config
@@ -0,0 +1,14 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+{cover_enabled, true}.
+{cover_print_enabled, true}.
diff --git a/src/couch_jobs/src/couch_jobs.app.src b/src/couch_jobs/src/couch_jobs.app.src
new file mode 100644
index 0000000..8ded14c
--- /dev/null
+++ b/src/couch_jobs/src/couch_jobs.app.src
@@ -0,0 +1,31 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+{application, couch_jobs, [
+    {description, "CouchDB Jobs"},
+    {vsn, git},
+    {mod, {couch_jobs_app, []}},
+    {registered, [
+        couch_jobs_sup,
+        couch_jobs_activity_monitor_sup,
+        couch_jobs_notifier_sup,
+        couch_jobs_server
+    ]},
+    {applications, [
+        kernel,
+        stdlib,
+        erlfdb,
+        couch_log,
+        config,
+        fabric
+    ]}
+]}.
diff --git a/src/couch_jobs/src/couch_jobs.erl b/src/couch_jobs/src/couch_jobs.erl
new file mode 100644
index 0000000..d469ed4
--- /dev/null
+++ b/src/couch_jobs/src/couch_jobs.erl
@@ -0,0 +1,378 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_jobs).
+
+-export([
+    % Job creation
+    add/4,
+    add/5,
+    remove/3,
+    get_job_data/3,
+    get_job_state/3,
+
+    % Job processing
+    accept/1,
+    accept/2,
+    finish/2,
+    finish/3,
+    resubmit/2,
+    resubmit/3,
+    is_resubmitted/1,
+    update/2,
+    update/3,
+
+    % Subscriptions
+    subscribe/2,
+    subscribe/3,
+    unsubscribe/1,
+    wait/2,
+    wait/3,
+
+    % Type timeouts
+    set_type_timeout/2,
+    clear_type_timeout/1,
+    get_type_timeout/1
+]).
+
+
+-include("couch_jobs.hrl").
+
+
+-define(MIN_ACCEPT_WAIT_MSEC, 100).
+
+
+%% Job Creation API
+
+-spec add(jtx(), job_type(), job_id(), job_data()) -> ok | {error, any()}.
+add(Tx, Type, JobId, JobData) ->
+    add(Tx, Type, JobId, JobData, 0).
+
+
+-spec add(jtx(), job_type(), job_id(), job_data(), scheduled_time()) ->
+    ok | {error, any()}.
+add(Tx, Type, JobId, JobData, ScheduledTime) when is_binary(JobId),
+        is_map(JobData), is_integer(ScheduledTime) ->
+    couch_jobs_fdb:tx(couch_jobs_fdb:get_jtx(Tx), fun(JTx) ->
+        case couch_jobs_fdb:add(JTx, Type, JobId, JobData, ScheduledTime) of
+            {ok, _, _, _} -> ok;
+            {error, Error} -> {error, Error}
+        end
+    end).
+
+
+-spec remove(jtx(), job_type(), job_id()) -> ok | {error, any()}.
+remove(Tx, Type, JobId) when is_binary(JobId) ->
+    couch_jobs_fdb:tx(couch_jobs_fdb:get_jtx(Tx), fun(JTx) ->
+        couch_jobs_fdb:remove(JTx, job(Type, JobId))
+    end).
+
+
+-spec get_job_data(jtx(), job_type(), job_id()) -> {ok, job_data()} | {error,
+    any()}.
+get_job_data(Tx, Type, JobId) when is_binary(JobId) ->
+    couch_jobs_fdb:tx(couch_jobs_fdb:get_jtx(Tx), fun(JTx) ->
+        case couch_jobs_fdb:get_job_state_and_data(JTx, job(Type, JobId)) of
+            {ok, _Seq, _State, Data} ->
+                {ok, couch_jobs_fdb:decode_data(Data)};
+            {error, Error} ->
+                {error, Error}
+        end
+    end).
+
+
+-spec get_job_state(jtx(), job_type(), job_id()) -> {ok, job_state()} | {error,
+    any()}.
+get_job_state(Tx, Type, JobId) when is_binary(JobId) ->
+    couch_jobs_fdb:tx(couch_jobs_fdb:get_jtx(Tx), fun(JTx) ->
+        case couch_jobs_fdb:get_job_state_and_data(JTx, job(Type, JobId)) of
+            {ok, _Seq, State, _Data} ->
+                {ok, State};
+            {error, Error} ->
+                {error, Error}
+        end
+    end).
+
+
+%% Job processor API
+
+-spec accept(job_type()) -> {ok, job(), job_data()} | {error, any()}.
+accept(Type) ->
+    accept(Type, #{}).
+
+
+-spec accept(job_type(), job_accept_opts()) -> {ok, job()} | {error, any()}.
+accept(Type, #{} = Opts) ->
+    NoSched = maps:get(no_schedule, Opts, false),
+    MaxSchedTimeDefault = case NoSched of
+        true -> 0;
+        false -> ?UNDEFINED_MAX_SCHEDULED_TIME
+    end,
+    MaxSchedTime = maps:get(max_sched_time, Opts, MaxSchedTimeDefault),
+    Timeout = maps:get(timeout, Opts, infinity),
+    case NoSched andalso MaxSchedTime =/= 0 of
+        true ->
+            {error, no_schedule_require_0_max_sched_time};
+        false ->
+            accept_loop(Type, NoSched, MaxSchedTime, Timeout)
+    end.
+
+
+-spec finish(jtx(), job()) -> ok | {error, any()}.
+finish(Tx, Job) ->
+    finish(Tx, Job, undefined).
+
+
+-spec finish(jtx(), job(), job_data()) -> ok | {error, any()}.
+finish(Tx, #{jlock := <<_/binary>>} = Job, JobData) ->
+    couch_jobs_fdb:tx(couch_jobs_fdb:get_jtx(Tx), fun(JTx) ->
+        couch_jobs_fdb:finish(JTx, Job, JobData)
+    end).
+
+
+-spec resubmit(jtx(), job()) -> {ok, job()} | {error, any()}.
+resubmit(Tx, Job) ->
+    resubmit(Tx, Job, ?UNDEFINED_MAX_SCHEDULED_TIME).
+
+
+-spec resubmit(jtx(), job(), scheduled_time()) -> {ok, job()} | {error, any()}.
+resubmit(Tx, #{jlock := <<_/binary>>} = Job, SchedTime) ->
+    couch_jobs_fdb:tx(couch_jobs_fdb:get_jtx(Tx), fun(JTx) ->
+        couch_jobs_fdb:resubmit(JTx, Job, SchedTime)
+    end).
+
+
+-spec is_resubmitted(job()) -> true | false.
+is_resubmitted(#{job := true} = Job) ->
+    maps:get(resubmit, Job, false).
+
+
+-spec update(jtx(), job()) -> {ok, job()} | {error, any()}.
+update(Tx, Job) ->
+    update(Tx, Job, undefined).
+
+
+-spec update(jtx(), job(), job_data()) -> {ok, job()} | {error, any()}.
+update(Tx, #{jlock := <<_/binary>>} = Job, JobData) ->
+    couch_jobs_fdb:tx(couch_jobs_fdb:get_jtx(Tx), fun(JTx) ->
+        couch_jobs_fdb:update(JTx, Job, JobData)
+    end).
+
+
+%% Subscription API
+
+% Receive events as messages. Wait for them using `wait/2,3`
+% functions.
+%
+
+-spec subscribe(job_type(), job_id()) -> {ok, job_subscription(), job_state(),
+    job_data()} | {ok, finished, job_data()} | {error, any()}.
+subscribe(Type, JobId) ->
+    subscribe(undefined, Type, JobId).
+
+
+-spec subscribe(jtx(), job_type(), job_id()) -> {ok, job_subscription(),
+    job_state(), job_data()} | {ok, finished, job_data()} | {error, any()}.
+subscribe(Tx, Type, JobId) ->
+    StateData = couch_jobs_fdb:tx(couch_jobs_fdb:get_jtx(Tx), fun(JTx) ->
+        Job = #{job => true, type => Type, id => JobId},
+        couch_jobs_fdb:get_job_state_and_data(JTx, Job)
+    end),
+    case StateData of
+        {ok, _Seq, finished, Data} ->
+            {ok, finished, couch_jobs_fdb:decode_data(Data)};
+        {ok, Seq, State, Data} ->
+            case couch_jobs_notifier:subscribe(Type, JobId, State, Seq) of
+                {ok, SubRef} ->
+                    Data1 = couch_jobs_fdb:decode_data(Data),
+                    {ok, SubRef, State, Data1};
+                {error, Error} ->
+                    {error, Error}
+            end;
+        {error, Error} ->
+            {error, Error}
+    end.
+
+
+% Unsubscribe from getting notifications based on a particular subscription.
+% Each subscription should be followed by its own unsubscription call. However,
+% subscriber processes are also monitored and auto-unsubscribed if they exit.
+% If subscribing process is exiting, calling this function is optional.
+%
+-spec unsubscribe(job_subscription()) -> ok.
+unsubscribe({Server, Ref}) when is_pid(Server), is_reference(Ref) ->
+    try
+        couch_jobs_notifier:unsubscribe(Server, Ref)
+    after
+        flush_notifications(Ref)
+    end.
+
+
+% Wait to receive job state updates
+%
+-spec wait(job_subscription() | [job_subscription()], timeout()) ->
+    {job_type(), job_id(), job_state(), job_data()} | timeout.
+wait({_, Ref}, Timeout) ->
+    receive
+        {?COUCH_JOBS_EVENT, Ref, Type, Id, State, Data} ->
+            {Type, Id, State, couch_jobs_fdb:decode_data(Data)}
+    after
+        Timeout -> timeout
+    end;
+
+wait(Subs, Timeout) when is_list(Subs) ->
+    {Result, ResendQ} = wait_any(Subs, Timeout, []),
+    lists:foreach(fun(Msg) -> self() ! Msg end, ResendQ),
+    Result.
+
+
+-spec wait(job_subscription() | [job_subscription()], job_state(), timeout())
+    -> {job_type(), job_id(), job_state(), job_data()} | timeout.
+wait({_, Ref} = Sub, State, Timeout) when is_atom(State) ->
+    receive
+        {?COUCH_JOBS_EVENT, Ref, Type, Id, MsgState, Data0} ->
+            case MsgState =:= State of
+                true ->
+                    Data = couch_jobs_fdb:decode_data(Data0),
+                    {Type, Id, State, Data};
+                false ->
+                    wait(Sub, State, Timeout)
+            end
+    after
+        Timeout -> timeout
+    end;
+
+wait(Subs, State, Timeout) when is_list(Subs),
+        is_atom(State) ->
+    {Result, ResendQ} = wait_any(Subs, State, Timeout, []),
+    lists:foreach(fun(Msg) -> self() ! Msg end, ResendQ),
+    Result.
+
+
+%% Job type timeout API
+
+% These functions manipulate the activity timeout for each job type.
+
+-spec set_type_timeout(job_type(), timeout()) -> ok.
+set_type_timeout(Type, Timeout) ->
+    couch_jobs_fdb:tx(couch_jobs_fdb:get_jtx(), fun(JTx) ->
+        couch_jobs_fdb:set_type_timeout(JTx, Type, Timeout)
+    end).
+
+
+-spec clear_type_timeout(job_type()) -> ok.
+clear_type_timeout(Type) ->
+    couch_jobs_fdb:tx(couch_jobs_fdb:get_jtx(), fun(JTx) ->
+        couch_jobs_fdb:clear_type_timeout(JTx, Type)
+    end).
+
+
+-spec get_type_timeout(job_type()) -> timeout().
+get_type_timeout(Type) ->
+    couch_jobs_fdb:tx(couch_jobs_fdb:get_jtx(), fun(JTx) ->
+        couch_jobs_fdb:get_type_timeout(JTx, Type)
+    end).
+
+
+%% Private utilities
+
+accept_loop(Type, NoSched, MaxSchedTime, Timeout) ->
+    TxFun =  fun(JTx) ->
+        couch_jobs_fdb:accept(JTx, Type, MaxSchedTime, NoSched)
+    end,
+    case couch_jobs_fdb:tx(couch_jobs_fdb:get_jtx(), TxFun) of
+        {ok, Job, Data} ->
+            {ok, Job, Data};
+        {not_found, PendingWatch} ->
+            case wait_pending(PendingWatch, MaxSchedTime, Timeout) of
+                {error, not_found} ->
+                    {error, not_found};
+                ok ->
+                    accept_loop(Type, NoSched, MaxSchedTime, Timeout)
+            end
+    end.
+
+
+job(Type, JobId) ->
+    #{job => true, type => Type, id => JobId}.
+
+
+wait_pending(PendingWatch, _MaxSTime, 0) ->
+    erlfdb:cancel(PendingWatch, [flush]),
+    {error, not_found};
+
+wait_pending(PendingWatch, MaxSTime, UserTimeout) ->
+    NowMSec = erlang:system_time(millisecond),
+    Timeout0 = max(?MIN_ACCEPT_WAIT_MSEC, MaxSTime * 1000 - NowMSec),
+    Timeout = min(limit_timeout(Timeout0), UserTimeout),
+    try
+        erlfdb:wait(PendingWatch, [{timeout, Timeout}]),
+        ok
+    catch
+        error:{timeout, _} ->
+            erlfdb:cancel(PendingWatch, [flush]),
+            {error, not_found}
+    end.
+
+
+wait_any(Subs, Timeout0, ResendQ) when is_list(Subs) ->
+    Timeout = limit_timeout(Timeout0),
+    receive
+        {?COUCH_JOBS_EVENT, Ref, Type, Id, State, Data0} = Msg ->
+            case lists:keyfind(Ref, 2, Subs) of
+                false ->
+                    wait_any(Subs, Timeout, [Msg | ResendQ]);
+                {_, Ref} ->
+                    Data = couch_jobs_fdb:decode_data(Data0),
+                    {{Type, Id, State, Data}, ResendQ}
+            end
+    after
+        Timeout -> {timeout, ResendQ}
+    end.
+
+
+wait_any(Subs, State, Timeout0, ResendQ) when
+        is_list(Subs) ->
+    Timeout = limit_timeout(Timeout0),
+    receive
+        {?COUCH_JOBS_EVENT, Ref, Type, Id, MsgState, Data0} = Msg ->
+            case lists:keyfind(Ref, 2, Subs) of
+                false ->
+                    wait_any(Subs, Timeout, [Msg | ResendQ]);
+                {_, Ref} ->
+                    case MsgState =:= State of
+                        true ->
+                            Data = couch_jobs_fdb:decode_data(Data0),
+                            {{Type, Id, State, Data}, ResendQ};
+                        false ->
+                            wait_any(Subs, Timeout, ResendQ)
+                    end
+            end
+    after
+        Timeout -> {timeout, ResendQ}
+    end.
+
+
+limit_timeout(Timeout) when is_integer(Timeout), Timeout < 16#FFFFFFFF ->
+    Timeout;
+
+limit_timeout(_Timeout) ->
+    infinity.
+
+
+flush_notifications(Ref) ->
+    receive
+        {?COUCH_JOBS_EVENT, Ref, _, _, _} ->
+            flush_notifications(Ref)
+    after
+        0 -> ok
+    end.
diff --git a/src/couch_jobs/src/couch_jobs.hrl b/src/couch_jobs/src/couch_jobs.hrl
new file mode 100644
index 0000000..2a02d76
--- /dev/null
+++ b/src/couch_jobs/src/couch_jobs.hrl
@@ -0,0 +1,52 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+
+% Job map/json field definitions
+%
+-define(OPT_PRIORITY, <<"priority">>).
+-define(OPT_DATA, <<"data">>).
+-define(OPT_CANCEL, <<"cancel">>).
+-define(OPT_RESUBMIT, <<"resubmit">>).
+
+% These might be in a fabric public hrl eventually
+%
+-define(uint2bin(I), binary:encode_unsigned(I, little)).
+-define(bin2uint(I), binary:decode_unsigned(I, little)).
+-define(UNSET_VS, {versionstamp, 16#FFFFFFFFFFFFFFFF, 16#FFFF}).
+-define(METADATA_VERSION_KEY, <<"$metadata_version_key$">>).
+
+% Data model definitions
+%
+-define(JOBS, 51).  % coordinate with fabric2.hrl
+-define(DATA, 1).
+-define(PENDING, 2).
+-define(WATCHES_PENDING, 3).
+-define(WATCHES_ACTIVITY, 4).
+-define(ACTIVITY_TIMEOUT, 5).
+-define(ACTIVITY, 6).
+
+
+-define(COUCH_JOBS_EVENT, '$couch_jobs_event').
+-define(COUCH_JOBS_CURRENT, '$couch_jobs_current').
+-define(UNDEFINED_MAX_SCHEDULED_TIME, 1 bsl 36).
+
+
+-type jtx() :: map() | undefined | tuple().
+-type job_id() :: binary().
+-type job_type() :: tuple() | binary() | non_neg_integer().
+-type job() :: map().
+-type job_data() :: map() | undefined.
+-type job_accept_opts() :: map().
+-type scheduled_time() :: non_neg_integer() | undefined.
+-type job_state() :: running | pending | finished.
+-type job_subscription() :: {pid(), reference()}.
diff --git a/src/couch_jobs/src/couch_jobs_activity_monitor.erl b/src/couch_jobs/src/couch_jobs_activity_monitor.erl
new file mode 100644
index 0000000..ef82e6b
--- /dev/null
+++ b/src/couch_jobs/src/couch_jobs_activity_monitor.erl
@@ -0,0 +1,133 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_jobs_activity_monitor).
+
+-behaviour(gen_server).
+
+
+-export([
+    start_link/1
+]).
+
+-export([
+    init/1,
+    terminate/2,
+    handle_call/3,
+    handle_cast/2,
+    handle_info/2,
+    code_change/3
+]).
+
+-record(st, {
+    jtx,
+    type,
+    tref,
+    timeout = 0,
+    vs = not_found
+}).
+
+
+-define(MAX_JITTER_DEFAULT, 10000).
+-define(MISSING_TIMEOUT_CHECK, 5000).
+
+
+start_link(Type) ->
+    gen_server:start_link(?MODULE, [Type], []).
+
+
+%% gen_server callbacks
+
+init([Type]) ->
+    St = #st{jtx = couch_jobs_fdb:get_jtx(), type = Type},
+    {ok, schedule_check(St)}.
+
+
+terminate(_, _St) ->
+    ok.
+
+
+handle_call(Msg, _From, St) ->
+    {stop, {bad_call, Msg}, {bad_call, Msg}, St}.
+
+
+handle_cast(Msg, St) ->
+    {stop, {bad_cast, Msg}, St}.
+
+
+handle_info(check_activity, St) ->
+    St1 = check_activity(St),
+    St2 = schedule_check(St1),
+    {noreply, St2};
+
+handle_info({Ref, ready}, St) when is_reference(Ref) ->
+    % Don't crash out couch_jobs_server and the whole application would need to
+    % eventually do proper cleanup in erlfdb:wait timeout code.
+    LogMsg = "~p : spurious erlfdb future ready message ~p",
+    couch_log:error(LogMsg, [?MODULE, Ref]),
+    {noreply, St};
+
+handle_info(Msg, St) ->
+    {stop, {bad_info, Msg}, St}.
+
+
+code_change(_OldVsn, St, _Extra) ->
+    {ok, St}.
+
+
+% Private helper functions
+
+check_activity(#st{jtx = JTx, type = Type, vs = not_found} = St) ->
+    NewVS = couch_jobs_fdb:tx(JTx, fun(JTx1) ->
+        couch_jobs_fdb:get_activity_vs(JTx1, Type)
+    end),
+    St#st{vs = NewVS};
+
+check_activity(#st{jtx = JTx, type = Type, vs = VS} = St) ->
+    NewVS = couch_jobs_fdb:tx(JTx, fun(JTx1) ->
+        NewVS = couch_jobs_fdb:get_activity_vs(JTx1, Type),
+        JobIds = couch_jobs_fdb:get_inactive_since(JTx1, Type, VS),
+        couch_jobs_fdb:re_enqueue_inactive(JTx1, Type, JobIds),
+        NewVS
+    end),
+    St#st{vs = NewVS}.
+
+
+get_timeout_msec(JTx, Type) ->
+    TimeoutVal = couch_jobs_fdb:tx(JTx, fun(JTx1) ->
+        couch_jobs_fdb:get_type_timeout(JTx1, Type)
+    end),
+    case TimeoutVal of
+        not_found -> not_found;
+        ValSeconds -> timer:seconds(ValSeconds)
+    end.
+
+
+schedule_check(#st{jtx = JTx, type = Type, timeout = OldTimeout} = St) ->
+    % Reset versionstamp if timeout changed.
+    St1 = case get_timeout_msec(JTx, Type) of
+        not_found ->
+            St#st{vs = not_found, timeout = ?MISSING_TIMEOUT_CHECK};
+        OldTimeout ->
+            St;
+        NewTimeout ->
+            St#st{vs = not_found, timeout = NewTimeout}
+    end,
+    #st{timeout = Timeout} = St1,
+    MaxJitter = min(Timeout div 2, get_max_jitter_msec()),
+    Wait = Timeout + rand:uniform(max(1, MaxJitter)),
+    St1#st{tref = erlang:send_after(Wait, self(), check_activity)}.
+
+
+get_max_jitter_msec()->
+    config:get_integer("couch_jobs", "activity_monitor_max_jitter_msec",
+        ?MAX_JITTER_DEFAULT).
diff --git a/src/couch_jobs/src/couch_jobs_activity_monitor_sup.erl b/src/couch_jobs/src/couch_jobs_activity_monitor_sup.erl
new file mode 100644
index 0000000..b11161a
--- /dev/null
+++ b/src/couch_jobs/src/couch_jobs_activity_monitor_sup.erl
@@ -0,0 +1,64 @@
+%
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_jobs_activity_monitor_sup).
+
+
+-behaviour(supervisor).
+
+
+-export([
+    start_link/0,
+
+    start_monitor/1,
+    stop_monitor/1,
+    get_child_pids/0
+]).
+
+-export([
+    init/1
+]).
+
+
+start_link() ->
+    supervisor:start_link({local, ?MODULE}, ?MODULE, []).
+
+
+start_monitor(Type) ->
+    supervisor:start_child(?MODULE, [Type]).
+
+
+stop_monitor(Pid) ->
+    supervisor:terminate_child(?MODULE, Pid).
+
+
+get_child_pids() ->
+    lists:map(fun({_Id, Pid, _Type, _Mod}) ->
+        Pid
+    end, supervisor:which_children(?MODULE)).
+
+
+init(_) ->
+    Flags = #{
+        strategy => simple_one_for_one,
+        intensity => 10,
+        period => 3
+    },
+    Children = [
+        #{
+            id => couch_jobs_monitor,
+            restart => temporary,
+            start => {couch_jobs_activity_monitor, start_link, []}
+        }
+    ],
+    {ok, {Flags, Children}}.
diff --git a/src/couch_jobs/src/couch_jobs_app.erl b/src/couch_jobs/src/couch_jobs_app.erl
new file mode 100644
index 0000000..720b948
--- /dev/null
+++ b/src/couch_jobs/src/couch_jobs_app.erl
@@ -0,0 +1,26 @@
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_jobs_app).
+
+
+-behaviour(application).
+
+
+-export([
+    start/2,
+    stop/1
+]).
+
+
+start(_Type, []) ->
+    couch_jobs_sup:start_link().
+
+
+stop([]) ->
+    ok.
diff --git a/src/couch_jobs/src/couch_jobs_fdb.erl b/src/couch_jobs/src/couch_jobs_fdb.erl
new file mode 100644
index 0000000..1317d03
--- /dev/null
+++ b/src/couch_jobs/src/couch_jobs_fdb.erl
@@ -0,0 +1,679 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_jobs_fdb).
+
+
+-export([
+    add/5,
+    remove/2,
+    get_job_state_and_data/2,
+    get_jobs/2,
+    get_jobs/3,
+
+    accept/4,
+    finish/3,
+    resubmit/3,
+    update/3,
+
+    set_type_timeout/3,
+    clear_type_timeout/2,
+    get_type_timeout/2,
+    get_types/1,
+
+    get_activity_vs/2,
+    get_activity_vs_and_watch/2,
+    get_active_since/3,
+    get_inactive_since/3,
+    re_enqueue_inactive/3,
+
+    init_cache/0,
+
+    encode_data/1,
+    decode_data/1,
+
+    get_jtx/0,
+    get_jtx/1,
+    tx/2,
+
+    get_job/2,
+    get_jobs/0
+]).
+
+
+-include("couch_jobs.hrl").
+
+
+-record(jv, {
+    seq,
+    jlock,
+    stime,
+    resubmit,
+    data
+}).
+
+
+-define(JOBS_ETS_KEY, jobs).
+-define(MD_TIMESTAMP_ETS_KEY, md_timestamp).
+-define(MD_VERSION_MAX_AGE_SEC, 10).
+-define(PENDING_SEQ, 0).
+
+
+% Data model
+%
+% (?JOBS, ?DATA, Type, JobId) = (Sequence, Lock, SchedTime, Resubmit, JobData)
+% (?JOBS, ?PENDING, Type, ScheduledTime, JobId) = ""
+% (?JOBS, ?WATCHES_PENDING, Type) = Counter
+% (?JOBS, ?WATCHES_ACTIVITY, Type) = Sequence
+% (?JOBS, ?ACTIVITY_TIMEOUT, Type) = ActivityTimeout
+% (?JOBS, ?ACTIVITY, Type, Sequence) = JobId
+%
+% In the ?DATA row Sequence can have these values:
+%  0 - when the job is pending
+%  null - when the job is finished
+%  Versionstamp - when the job is running
+
+
+% Job creation API
+
+add(#{jtx := true} = JTx0, Type, JobId, Data, STime) ->
+    #{tx := Tx} = JTx = get_jtx(JTx0),
+    Job = #{job => true, type => Type, id => JobId},
+    case get_type_timeout(JTx, Type) of
+        not_found ->
+            {error, no_type_timeout};
+        Int when is_integer(Int) ->
+            Key = job_key(JTx, Job),
+            case erlfdb:wait(erlfdb:get(Tx, Key)) of
+                <<_/binary>> ->
+                    {ok, Job1} = resubmit(JTx, Job, STime),
+                    #{seq := Seq, state := State, data := Data1} = Job1,
+                    {ok, State, Seq, Data1};
+                not_found ->
+                    try
+                        maybe_enqueue(JTx, Type, JobId, STime, true, Data),
+                        {ok, pending, ?PENDING_SEQ, Data}
+                    catch
+                        error:{json_encoding_error, Error} ->
+                            {error, {json_encoding_error, Error}}
+                    end
+            end
+    end.
+
+
+remove(#{jtx := true} = JTx0, #{job := true} = Job) ->
+    #{tx := Tx} = JTx = get_jtx(JTx0),
+    #{type := Type, id := JobId} = Job,
+    Key = job_key(JTx, Job),
+    case get_job_val(Tx, Key) of
+        #jv{stime = STime} ->
+            couch_jobs_pending:remove(JTx, Type, JobId, STime),
+            erlfdb:clear(Tx, Key),
+            ok;
+        not_found ->
+            {error, not_found}
+    end.
+
+
+get_job_state_and_data(#{jtx := true} = JTx, #{job := true} = Job) ->
+    case get_job_val(get_jtx(JTx), Job) of
+        #jv{seq = Seq, jlock = JLock, data = Data} ->
+            {ok, Seq, job_state(JLock, Seq), Data};
+        not_found ->
+            {error, not_found}
+    end.
+
+
+get_jobs(JTx, Type) ->
+    get_jobs(JTx, Type, fun(_) -> true end).
+
+
+get_jobs(#{jtx := true} = JTx, Type, Filter) when is_function(Filter, 1) ->
+    #{tx := Tx, jobs_path := Jobs} = get_jtx(JTx),
+    Prefix = erlfdb_tuple:pack({?DATA, Type}, Jobs),
+    Opts = [{streaming_mode, want_all}],
+    Result = erlfdb:wait(erlfdb:get_range_startswith(Tx, Prefix, Opts)),
+    lists:foldl(fun({K, V}, #{} = Acc) ->
+        {JobId} = erlfdb_tuple:unpack(K, Prefix),
+        case Filter(JobId) of
+            true ->
+                {Seq, JLock, _, _, Data} = erlfdb_tuple:unpack(V),
+                Acc#{JobId => {Seq, job_state(JLock, Seq), Data}};
+            false ->
+                Acc
+        end
+    end, #{}, Result).
+
+
+% Job processor API
+
+accept(#{jtx := true} = JTx0, Type, MaxSTime, NoSched)
+        when is_integer(MaxSTime), is_boolean(NoSched) ->
+    #{jtx := true, tx := Tx} = JTx = get_jtx(JTx0),
+    case couch_jobs_pending:dequeue(JTx, Type, MaxSTime, NoSched) of
+        {not_found, PendingWatch} ->
+            {not_found, PendingWatch};
+        {ok, JobId} ->
+            JLock = fabric2_util:uuid(),
+            Key = job_key(JTx, Type, JobId),
+            JV0 = get_job_val(Tx, Key),
+            #jv{jlock = null, data = Data} = JV0,
+            JV = JV0#jv{seq = ?UNSET_VS, jlock = JLock, resubmit = false},
+            set_job_val(Tx, Key, JV),
+            update_activity(JTx, Type, JobId, null, Data),
+            Job = #{
+                job => true,
+                type => Type,
+                id => JobId,
+                jlock => JLock
+            },
+            {ok, Job, decode_data(Data)}
+    end.
+
+
+finish(#{jtx := true} = JTx0, #{jlock := <<_/binary>>} = Job, Data) when
+        is_map(Data) orelse Data =:= undefined ->
+    #{tx := Tx} = JTx = get_jtx(JTx0),
+    #{type := Type, jlock := JLock, id := JobId} = Job,
+    case get_job_or_halt(Tx, job_key(JTx, Job), JLock) of
+        #jv{seq = Seq, stime = STime, resubmit = Resubmit, data = OldData} ->
+            NewData = case Data =:= undefined of
+                true -> OldData;
+                false -> Data
+            end,
+            try maybe_enqueue(JTx, Type, JobId, STime, Resubmit, NewData) of
+                ok ->
+                    clear_activity(JTx, Type, Seq),
+                    update_watch(JTx, Type)
+            catch
+                error:{json_encoding_error, Error} ->
+                    {error, {json_encoding_error, Error}}
+            end;
+        halt ->
+            {error, halt}
+    end.
+
+
+resubmit(#{jtx := true} = JTx0, #{job := true} = Job, NewSTime) ->
+    #{tx := Tx} = JTx = get_jtx(JTx0),
+    #{type := Type, id := JobId} = Job,
+    Key = job_key(JTx, Job),
+    case get_job_val(Tx, Key) of
+        #jv{seq = Seq, jlock = JLock, stime = OldSTime, data = Data} = JV ->
+            STime = case NewSTime =:= undefined of
+                true -> OldSTime;
+                false -> NewSTime
+            end,
+            case job_state(JLock, Seq) of
+                finished ->
+                    ok = maybe_enqueue(JTx, Type, JobId, STime, true, Data),
+                    Job1 = Job#{
+                        seq => ?PENDING_SEQ,
+                        state => pending,
+                        data => Data
+                    },
+                    {ok, Job1};
+                pending ->
+                    JV1 = JV#jv{seq = ?PENDING_SEQ, stime = STime},
+                    set_job_val(Tx, Key, JV1),
+                    couch_jobs_pending:remove(JTx, Type, JobId, OldSTime),
+                    couch_jobs_pending:enqueue(JTx, Type, STime, JobId),
+                    Job1 = Job#{
+                        stime => STime,
+                        seq => ?PENDING_SEQ,
+                        state => pending,
+                        data => Data
+                    },
+                    {ok, Job1};
+                running ->
+                    JV1 = JV#jv{stime = STime, resubmit = true},
+                    set_job_val(Tx, Key, JV1),
+                    {ok, Job#{resubmit => true, stime => STime,
+                        state => running, seq => Seq, data => Data}}
+            end;
+        not_found ->
+            {error, not_found}
+    end.
+
+
+update(#{jtx := true} = JTx0, #{jlock := <<_/binary>>} = Job, Data0) when
+        is_map(Data0) orelse Data0 =:= undefined ->
+    #{tx := Tx} = JTx = get_jtx(JTx0),
+    #{jlock := JLock, type := Type, id := JobId} = Job,
+    Key = job_key(JTx, Job),
+    case get_job_or_halt(Tx, Key, JLock) of
+        #jv{seq = Seq, stime = STime, resubmit = Resubmit} = JV0 ->
+            Data = case Data0 =:= undefined of
+                true -> JV0#jv.data;
+                false -> Data0
+            end,
+            JV = JV0#jv{seq = ?UNSET_VS, data = Data},
+            try set_job_val(Tx, Key, JV) of
+                ok ->
+                    update_activity(JTx, Type, JobId, Seq, Data),
+                    {ok, Job#{resubmit => Resubmit, stime => STime}}
+            catch
+                error:{json_encoding_error, Error} ->
+                    {error, {json_encoding_error, Error}}
+            end;
+        halt ->
+            {error, halt}
+    end.
+
+
+% Type and activity monitoring API
+
+set_type_timeout(#{jtx := true} = JTx, Type, Timeout) ->
+    #{tx := Tx, jobs_path := Jobs} = get_jtx(JTx),
+    Key = erlfdb_tuple:pack({?ACTIVITY_TIMEOUT, Type}, Jobs),
+    Val = erlfdb_tuple:pack({Timeout}),
+    erlfdb:set(Tx, Key, Val).
+
+
+clear_type_timeout(#{jtx := true} = JTx, Type) ->
+    #{tx := Tx, jobs_path := Jobs} = get_jtx(JTx),
+    Key = erlfdb_tuple:pack({?ACTIVITY_TIMEOUT, Type}, Jobs),
+    erlfdb:clear(Tx, Key).
+
+
+get_type_timeout(#{jtx := true} = JTx, Type) ->
+    #{tx := Tx, jobs_path := Jobs} = get_jtx(JTx),
+    Key = erlfdb_tuple:pack({?ACTIVITY_TIMEOUT, Type}, Jobs),
+    case erlfdb:wait(erlfdb:get_ss(Tx, Key)) of
+        not_found ->
+            not_found;
+        Val ->
+            {Timeout} = erlfdb_tuple:unpack(Val),
+            Timeout
+    end.
+
+
+get_types(#{jtx := true} = JTx) ->
+    #{tx := Tx, jobs_path := Jobs} = get_jtx(JTx),
+    Prefix = erlfdb_tuple:pack({?ACTIVITY_TIMEOUT}, Jobs),
+    Opts = [{streaming_mode, want_all}],
+    Result = erlfdb:wait(erlfdb:get_range_startswith(Tx, Prefix, Opts)),
+    lists:map(fun({K, _V}) ->
+        {Type} = erlfdb_tuple:unpack(K, Prefix),
+        Type
+    end, Result).
+
+
+get_activity_vs(#{jtx := true} = JTx, Type) ->
+    #{tx := Tx, jobs_path := Jobs} = get_jtx(JTx),
+    Key = erlfdb_tuple:pack({?WATCHES_ACTIVITY, Type}, Jobs),
+    case erlfdb:wait(erlfdb:get(Tx, Key)) of
+        not_found ->
+            not_found;
+        Val ->
+            {VS} = erlfdb_tuple:unpack(Val),
+            VS
+    end.
+
+
+get_activity_vs_and_watch(#{jtx := true} = JTx, Type) ->
+    #{tx := Tx, jobs_path := Jobs} = get_jtx(JTx),
+    Key = erlfdb_tuple:pack({?WATCHES_ACTIVITY, Type}, Jobs),
+    Future = erlfdb:get(Tx, Key),
+    Watch = erlfdb:watch(Tx, Key),
+    case erlfdb:wait(Future) of
+        not_found ->
+            {not_found, Watch};
+        Val ->
+            {VS} = erlfdb_tuple:unpack(Val),
+            {VS, Watch}
+    end.
+
+
+get_active_since(#{jtx := true} = JTx, Type, Versionstamp) ->
+    #{tx := Tx, jobs_path := Jobs} = get_jtx(JTx),
+    Prefix = erlfdb_tuple:pack({?ACTIVITY}, Jobs),
+    StartKey = erlfdb_tuple:pack({Type, Versionstamp}, Prefix),
+    StartKeySel = erlfdb_key:first_greater_or_equal(StartKey),
+    {_, EndKey} = erlfdb_tuple:range({Type}, Prefix),
+    Opts = [{streaming_mode, want_all}],
+    Future = erlfdb:get_range(Tx, StartKeySel, EndKey, Opts),
+    maps:from_list(lists:map(fun({_K, V}) ->
+        erlfdb_tuple:unpack(V)
+    end, erlfdb:wait(Future))).
+
+
+get_inactive_since(#{jtx := true} = JTx, Type, Versionstamp) ->
+    #{tx := Tx, jobs_path := Jobs} = get_jtx(JTx),
+    Prefix = erlfdb_tuple:pack({?ACTIVITY}, Jobs),
+    {StartKey, _} = erlfdb_tuple:range({Type}, Prefix),
+    EndKey = erlfdb_tuple:pack({Type, Versionstamp}, Prefix),
+    EndKeySel = erlfdb_key:first_greater_than(EndKey),
+    Opts = [{streaming_mode, want_all}],
+    Future = erlfdb:get_range(Tx, StartKey, EndKeySel, Opts),
+    lists:map(fun({_K, V}) ->
+        {JobId, _} = erlfdb_tuple:unpack(V),
+        JobId
+    end, erlfdb:wait(Future)).
+
+
+re_enqueue_inactive(#{jtx := true} = JTx, Type, JobIds) when is_list(JobIds) ->
+    #{tx := Tx} = get_jtx(JTx),
+    lists:foreach(fun(JobId) ->
+        case get_job_val(Tx, job_key(JTx, Type, JobId)) of
+            #jv{seq = Seq, stime = STime, data = Data} ->
+                clear_activity(JTx, Type, Seq),
+                maybe_enqueue(JTx, Type, JobId, STime, true, Data);
+            not_found ->
+                ok
+        end
+    end, JobIds),
+    case length(JobIds) > 0 of
+        true -> update_watch(JTx, Type);
+        false -> ok
+    end.
+
+
+% Cache initialization API. Called from the supervisor just to create the ETS
+% table. It returns `ignore` to tell supervisor it won't actually start any
+% process, which is what we want here.
+%
+init_cache() ->
+    ConcurrencyOpts = [{read_concurrency, true}, {write_concurrency, true}],
+    ets:new(?MODULE, [public, named_table] ++ ConcurrencyOpts),
+    ignore.
+
+
+% Functions to encode / decode JobData
+%
+encode_data(#{} = JobData) ->
+    try
+        jiffy:encode(JobData)
+    catch
+        throw:{error, Error} ->
+            % legacy clause since new versions of jiffy raise error instead
+            error({json_encoding_error, Error});
+        error:{error, Error} ->
+            error({json_encoding_error, Error})
+    end.
+
+
+decode_data(#{} = JobData) ->
+    JobData;
+
+decode_data(<<_/binary>> = JobData) ->
+    jiffy:decode(JobData, [return_maps]).
+
+
+% Cached job transaction object. This object wraps a transaction, caches the
+% directory lookup path, and the metadata version. The function can be used
+% from inside or outside the transaction. When used from a transaction it will
+% verify if the metadata was changed, and will refresh automatically.
+%
+get_jtx() ->
+    get_jtx(undefined).
+
+
+get_jtx(#{tx := Tx} = _TxDb) ->
+    get_jtx(Tx);
+
+get_jtx(undefined = _Tx) ->
+    case ets:lookup(?MODULE, ?JOBS_ETS_KEY) of
+        [{_, #{} = JTx}] ->
+            JTx;
+        [] ->
+            JTx = update_jtx_cache(init_jtx(undefined)),
+            JTx#{tx := undefined}
+    end;
+
+get_jtx({erlfdb_transaction, _} = Tx) ->
+    case ets:lookup(?MODULE, ?JOBS_ETS_KEY) of
+        [{_, #{} = JTx}] ->
+            ensure_current(JTx#{tx := Tx});
+        [] ->
+            update_jtx_cache(init_jtx(Tx))
+    end.
+
+
+% Transaction processing to be used with couch jobs' specific transaction
+% contexts
+%
+tx(#{jtx := true} = JTx, Fun) when is_function(Fun, 1) ->
+    fabric2_fdb:transactional(JTx, Fun).
+
+
+% Debug and testing API
+
+get_job(Type, JobId) ->
+    fabric2_fdb:transactional(fun(Tx) ->
+        JTx = init_jtx(Tx),
+        case get_job_val(Tx, job_key(JTx, Type, JobId)) of
+            #jv{seq = Seq, jlock = JLock} = JV ->
+                #{
+                    job => true,
+                    type => Type,
+                    id => JobId,
+                    seq => Seq,
+                    jlock => JLock,
+                    stime => JV#jv.stime,
+                    resubmit => JV#jv.resubmit,
+                    data => decode_data(JV#jv.data),
+                    state => job_state(JLock, Seq)
+                };
+            not_found ->
+                not_found
+        end
+    end).
+
+
+get_jobs() ->
+    fabric2_fdb:transactional(fun(Tx) ->
+        #{jobs_path := Jobs} = init_jtx(Tx),
+        Prefix = erlfdb_tuple:pack({?DATA}, Jobs),
+        Opts = [{streaming_mode, want_all}],
+        Result = erlfdb:wait(erlfdb:get_range_startswith(Tx, Prefix, Opts)),
+        lists:map(fun({K, V}) ->
+            {Type, JobId} = erlfdb_tuple:unpack(K, Prefix),
+            {Seq, JLock, _, _, Data} = erlfdb_tuple:unpack(V),
+            JobState = job_state(JLock, Seq),
+            {Type, JobId, JobState, decode_data(Data)}
+        end, Result)
+    end).
+
+
+% Private helper functions
+
+maybe_enqueue(#{jtx := true} = JTx, Type, JobId, STime, Resubmit, Data) ->
+    #{tx := Tx} = JTx,
+    Key = job_key(JTx, Type, JobId),
+    JV = #jv{
+        seq = null,
+        jlock = null,
+        stime = STime,
+        resubmit = false,
+        data = Data
+    },
+    case Resubmit of
+        true ->
+            set_job_val(Tx, Key, JV#jv{seq = ?PENDING_SEQ}),
+            couch_jobs_pending:enqueue(JTx, Type, STime, JobId);
+        false ->
+            set_job_val(Tx, Key, JV)
+    end,
+    ok.
+
+
+job_key(#{jtx := true, jobs_path := Jobs}, Type, JobId) ->
+    erlfdb_tuple:pack({?DATA, Type, JobId}, Jobs).
+
+
+job_key(JTx, #{type := Type, id := JobId}) ->
+    job_key(JTx, Type, JobId).
+
+
+get_job_val(#{jtx := true, tx := Tx} = JTx, #{job := true} = Job) ->
+    get_job_val(Tx, job_key(JTx, Job));
+
+get_job_val(Tx = {erlfdb_transaction, _}, Key) ->
+    case erlfdb:wait(erlfdb:get(Tx, Key)) of
+        <<_/binary>> = Val ->
+            {Seq, JLock, STime, Resubmit, Data} = erlfdb_tuple:unpack(Val),
+            #jv{
+                seq = Seq,
+                jlock = JLock,
+                stime = STime,
+                resubmit = Resubmit,
+                data = Data
+            };
+        not_found ->
+            not_found
+    end.
+
+
+set_job_val(Tx = {erlfdb_transaction, _}, Key, #jv{} = JV) ->
+    #jv{
+        seq = Seq,
+        jlock = JLock,
+        stime = STime,
+        resubmit = Resubmit,
+        data = Data0
+    } = JV,
+    Data = case Data0 of
+        #{} -> encode_data(Data0);
+        <<_/binary>> -> Data0
+    end,
+    case Seq of
+        ?UNSET_VS ->
+            Val = erlfdb_tuple:pack_vs({Seq, JLock, STime, Resubmit, Data}),
+            erlfdb:set_versionstamped_value(Tx, Key, Val);
+        _Other ->
+            Val = erlfdb_tuple:pack({Seq, JLock, STime, Resubmit, Data}),
+            erlfdb:set(Tx, Key, Val)
+    end,
+    ok.
+
+
+get_job_or_halt(Tx, Key, JLock) ->
+    case get_job_val(Tx, Key) of
+        #jv{jlock = CurJLock} when CurJLock =/= JLock ->
+            halt;
+        #jv{} = Res ->
+            Res;
+        not_found ->
+            halt
+    end.
+
+
+update_activity(#{jtx := true} = JTx, Type, JobId, Seq, Data0) ->
+    #{tx := Tx, jobs_path :=  Jobs} = JTx,
+    case Seq =/= null of
+        true -> clear_activity(JTx, Type, Seq);
+        false -> ok
+    end,
+    Key = erlfdb_tuple:pack_vs({?ACTIVITY, Type, ?UNSET_VS}, Jobs),
+    Data = case Data0 of
+        #{} -> encode_data(Data0);
+        <<_/binary>> -> Data0
+    end,
+    Val = erlfdb_tuple:pack({JobId, Data}),
+    erlfdb:set_versionstamped_key(Tx, Key, Val),
+    update_watch(JTx, Type).
+
+
+clear_activity(#{jtx := true} = JTx, Type, Seq) ->
+    #{tx := Tx, jobs_path :=  Jobs} = JTx,
+    Key = erlfdb_tuple:pack({?ACTIVITY, Type, Seq}, Jobs),
+    erlfdb:clear(Tx, Key).
+
+
+update_watch(#{jtx := true} = JTx, Type) ->
+    #{tx := Tx, jobs_path :=  Jobs} = JTx,
+    Key = erlfdb_tuple:pack({?WATCHES_ACTIVITY, Type}, Jobs),
+    Val = erlfdb_tuple:pack_vs({?UNSET_VS}),
+    erlfdb:set_versionstamped_value(Tx, Key, Val),
+    ok.
+
+
+job_state(JLock, Seq) ->
+    case {JLock, Seq} of
+        {null, null} -> finished;
+        {JLock, _} when JLock =/= null -> running;
+        {null, Seq} when Seq =/= null -> pending
+    end.
+
+
+% This a transaction context object similar to the Db = #{} one from
+% fabric2_fdb. It's is used to cache the jobs path directory (to avoid extra
+% lookups on every operation) and to check for metadata changes (in case
+% directory changes).
+%
+init_jtx(undefined) ->
+    fabric2_fdb:transactional(fun(Tx) -> init_jtx(Tx) end);
+
+init_jtx({erlfdb_transaction, _} = Tx) ->
+    Root = erlfdb_directory:root(),
+    CouchDB = erlfdb_directory:create_or_open(Tx, Root, [<<"couchdb">>]),
+    LayerPrefix = erlfdb_directory:get_name(CouchDB),
+    Jobs = erlfdb_tuple:pack({?JOBS}, LayerPrefix),
+    Version = erlfdb:wait(erlfdb:get(Tx, ?METADATA_VERSION_KEY)),
+    % layer_prefix, md_version and tx here match db map fields in fabric2_fdb
+    % but we also assert that this is a job transaction using the jtx => true
+    % field
+    #{
+        jtx => true,
+        tx => Tx,
+        layer_prefix => LayerPrefix,
+        jobs_path => Jobs,
+        md_version => Version
+    }.
+
+
+ensure_current(#{jtx := true, tx := Tx} = JTx) ->
+    case get(?COUCH_JOBS_CURRENT) of
+        Tx ->
+            JTx;
+        _ ->
+            JTx1 = update_current(JTx),
+            put(?COUCH_JOBS_CURRENT, Tx),
+            JTx1
+    end.
+
+
+update_current(#{tx := Tx, md_version := Version} = JTx) ->
+    case get_md_version_age(Version) of
+        Age when Age =< ?MD_VERSION_MAX_AGE_SEC ->
+            % Looked it up not too long ago. Avoid looking it up to frequently
+            JTx;
+        _ ->
+            case erlfdb:wait(erlfdb:get(Tx, ?METADATA_VERSION_KEY)) of
+                Version ->
+                    update_md_version_timestamp(Version),
+                    JTx;
+                _NewVersion ->
+                    update_jtx_cache(init_jtx(Tx))
+            end
+    end.
+
+
+update_jtx_cache(#{jtx := true, md_version := Version} = JTx) ->
+    CachedJTx = JTx#{tx := undefined},
+    ets:insert(?MODULE, {?JOBS_ETS_KEY, CachedJTx}),
+    update_md_version_timestamp(Version),
+    JTx.
+
+
+get_md_version_age(Version) ->
+    Timestamp = case ets:lookup(?MODULE, ?MD_TIMESTAMP_ETS_KEY) of
+        [{_, Version, Ts}] -> Ts;
+        _ -> 0
+    end,
+    erlang:system_time(second) - Timestamp.
+
+
+update_md_version_timestamp(Version) ->
+    Ts = erlang:system_time(second),
+    ets:insert(?MODULE, {?MD_TIMESTAMP_ETS_KEY, Version, Ts}).
diff --git a/src/couch_jobs/src/couch_jobs_notifier.erl b/src/couch_jobs/src/couch_jobs_notifier.erl
new file mode 100644
index 0000000..1c554a0
--- /dev/null
+++ b/src/couch_jobs/src/couch_jobs_notifier.erl
@@ -0,0 +1,285 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_jobs_notifier).
+
+-behaviour(gen_server).
+
+
+-export([
+    start_link/1,
+    subscribe/4,
+    unsubscribe/2
+]).
+
+-export([
+    init/1,
+    terminate/2,
+    handle_call/3,
+    handle_cast/2,
+    handle_info/2,
+    code_change/3
+]).
+
+
+-include("couch_jobs.hrl").
+
+
+-define(TYPE_MONITOR_HOLDOFF_DEFAULT, 50).
+-define(TYPE_MONITOR_TIMEOUT_DEFAULT, "infinity").
+-define(GET_JOBS_RANGE_RATIO, 0.5).
+
+
+-record(st, {
+    jtx,
+    type,
+    monitor_pid,
+    subs, % #{JobId => #{Ref => {Pid, State, Seq}}}
+    pidmap, % #{{Jobid, Pid} => Ref}
+    refmap % #{Ref => JobId}
+}).
+
+
+start_link(Type) ->
+    gen_server:start_link(?MODULE, [Type], []).
+
+
+subscribe(Type, JobId, State, Seq) ->
+    case couch_jobs_server:get_notifier_server(Type) of
+        {ok, Server} ->
+            CallArgs = {subscribe, JobId, State, Seq, self()},
+            Ref = gen_server:call(Server, CallArgs, infinity),
+            {ok, {Server, Ref}};
+        {error, Error} ->
+            {error, Error}
+    end.
+
+
+unsubscribe(Server, Ref) when is_reference(Ref) ->
+    gen_server:call(Server, {unsubscribe, Ref, self()}, infinity).
+
+
+init([Type]) ->
+    JTx = couch_jobs_fdb:get_jtx(),
+    St = #st{
+        jtx = JTx,
+        type = Type,
+        subs = #{},
+        pidmap = #{},
+        refmap = #{}
+    },
+    VS = get_type_vs(St),
+    HoldOff = get_holdoff(),
+    Timeout = get_timeout(),
+    Pid = couch_jobs_type_monitor:start(Type, VS, HoldOff, Timeout),
+    {ok, St#st{monitor_pid = Pid}}.
+
+
+terminate(_, _St) ->
+    ok.
+
+
+handle_call({subscribe, JobId, State, Seq, Pid}, _From, #st{} = St) ->
+    #st{pidmap = PidMap, refmap = RefMap} = St,
+    case maps:get({JobId, Pid}, PidMap, not_found) of
+        not_found ->
+            Ref = erlang:monitor(process, Pid),
+            St1 = update_sub(JobId, Ref, Pid, State, Seq, St),
+            St2 = St1#st{pidmap = PidMap#{{JobId, Pid} => Ref}},
+            St3 = St2#st{refmap = RefMap#{Ref => JobId}},
+            {reply, Ref, St3};
+        Ref when is_reference(Ref) ->
+            St1 = update_sub(JobId, Ref, Pid, State, Seq, St),
+            {reply, Ref, St1}
+    end;
+
+handle_call({unsubscribe, Ref, Pid}, _From, #st{} = St) ->
+    {reply, ok, unsubscribe_int(Ref, Pid, St)};
+
+handle_call(Msg, _From, St) ->
+    {stop, {bad_call, Msg}, {bad_call, Msg}, St}.
+
+
+handle_cast(Msg, St) ->
+    {stop, {bad_cast, Msg}, St}.
+
+
+handle_info({type_updated, VS}, St) ->
+    VSMax = flush_type_updated_messages(VS),
+    {noreply, notify_subscribers(VSMax, St)};
+
+handle_info({Ref, ready}, St) when is_reference(Ref) ->
+    % Don't crash out couch_jobs_server and the whole application would need to
+    % eventually do proper cleanup in erlfdb:wait timeout code.
+    LogMsg = "~p : spurious erlfdb future ready message ~p",
+    couch_log:error(LogMsg, [?MODULE, Ref]),
+    {noreply, St};
+
+handle_info({'DOWN', Ref, process, Pid, _}, #st{} = St) ->
+    {noreply, unsubscribe_int(Ref, Pid, St)};
+
+handle_info(Msg, St) ->
+    {stop, {bad_info, Msg}, St}.
+
+
+code_change(_OldVsn, St, _Extra) ->
+    {ok, St}.
+
+
+update_subs(JobId, Refs, #st{subs = Subs} = St) when map_size(Refs) =:= 0 ->
+    St#st{subs = maps:remove(JobId, Subs)};
+
+update_subs(JobId, Refs, #st{subs = Subs} = St) when map_size(Refs) > 0 ->
+    St#st{subs = Subs#{JobId => Refs}}.
+
+
+update_sub(JobId, Ref, Pid, State, Seq, #st{subs = Subs} = St) ->
+    Refs =  maps:get(JobId, Subs, #{}),
+    update_subs(JobId, Refs#{Ref => {Pid, State, Seq}}, St).
+
+
+remove_sub(JobId, Ref, #st{subs = Subs} = St) ->
+    case maps:get(JobId, Subs, not_found) of
+        not_found -> St;
+        #{} = Refs -> update_subs(JobId, maps:remove(Ref, Refs), St)
+    end.
+
+
+unsubscribe_int(Id, Ref, Pid, #st{pidmap = PidMap, refmap = RefMap} = St) ->
+    St1 = remove_sub(Id, Ref, St),
+    erlang:demonitor(Ref, [flush]),
+    St1#st{
+        pidmap = maps:remove({Id, Pid}, PidMap),
+        refmap = maps:remove(Ref, RefMap)
+    }.
+
+
+unsubscribe_int(Ref, Pid, #st{refmap = RefMap} = St) ->
+    case maps:get(Ref, RefMap, not_found) of
+        not_found -> St;
+        Id -> unsubscribe_int(Id, Ref, Pid, St)
+    end.
+
+
+flush_type_updated_messages(VSMax) ->
+    receive
+        {type_updated, VS} ->
+            flush_type_updated_messages(max(VS, VSMax))
+    after
+        0 -> VSMax
+    end.
+
+
+get_jobs(#st{jtx = JTx, type = Type}, InactiveIdMap, Ratio)
+        when Ratio >= ?GET_JOBS_RANGE_RATIO ->
+    Filter = fun(JobId) -> maps:is_key(JobId, InactiveIdMap) end,
+    JobMap = couch_jobs_fdb:tx(JTx, fun(JTx1) ->
+        couch_jobs_fdb:get_jobs(JTx1, Type, Filter)
+    end),
+    maps:map(fun(JobId, _) ->
+        case maps:is_key(JobId, JobMap) of
+            true -> maps:get(JobId, JobMap);
+            false -> {null, not_found, not_found}
+        end
+    end, InactiveIdMap);
+
+get_jobs(#st{jtx = JTx, type = Type}, InactiveIdMap, _) ->
+    couch_jobs_fdb:tx(JTx, fun(JTx1) ->
+        maps:map(fun(JobId, _) ->
+            Job = #{job => true, type => Type, id => JobId},
+            case couch_jobs_fdb:get_job_state_and_data(JTx1, Job) of
+                {ok, Seq, State, Data} ->
+                    {Seq, State, Data};
+                {error, not_found} ->
+                    {null, not_found, not_found}
+            end
+        end, InactiveIdMap)
+    end).
+
+
+get_type_vs(#st{jtx = JTx, type = Type}) ->
+    couch_jobs_fdb:tx(JTx, fun(JTx1) ->
+        couch_jobs_fdb:get_activity_vs(JTx1, Type)
+    end).
+
+
+% "Active since" is the set of jobs that have been active (running)
+% and updated at least once since the given versionstamp. These are relatively
+% cheap to find as it's just a range read in the ?ACTIVITY subspace.
+%
+get_active_since(#st{} = _St, not_found) ->
+    #{};
+
+get_active_since(#st{jtx = JTx, type = Type, subs = Subs}, VS) ->
+    AllUpdated = couch_jobs_fdb:tx(JTx, fun(JTx1) ->
+        couch_jobs_fdb:get_active_since(JTx1, Type, VS)
+    end),
+    maps:map(fun(_JobId, Data) ->
+        {VS, running, Data}
+    end, maps:with(maps:keys(Subs), AllUpdated)).
+
+
+notify_subscribers(_, #st{subs = Subs} = St) when map_size(Subs) =:= 0 ->
+    St;
+
+notify_subscribers(ActiveVS, #st{} = St1) ->
+    % First gather the easy (cheap) active jobs. Then with those out of way
+    % inspect each job to get its state.
+    Active = get_active_since(St1, ActiveVS),
+    St2 = notify_job_ids(Active, St1),
+    ActiveIds = maps:keys(Active),
+    Subs = St2#st.subs,
+    InactiveIdMap = maps:without(ActiveIds, Subs),
+    InactiveRatio = maps:size(InactiveIdMap) / maps:size(Subs),
+    Inactive = get_jobs(St2, InactiveIdMap, InactiveRatio),
+    notify_job_ids(Inactive, St2).
+
+
+notify_job_ids(#{} = Jobs, #st{type = Type} = St0) ->
+    maps:fold(fun(Id, {VS, State, Data}, #st{} = StAcc) ->
+        DoUnsub = lists:member(State, [finished, not_found]),
+        maps:fold(fun
+            (_Ref, {_Pid, running, OldVS}, St) when State =:= running,
+                    OldVS >= VS ->
+                St;
+            (Ref, {Pid, running, OldVS}, St) when State =:= running,
+                    OldVS < VS ->
+                % For running state send updates even if state doesn't change
+                notify(Pid, Ref, Type, Id, State, Data),
+                update_sub(Id, Ref, Pid, running, VS, St);
+            (_Ref, {_Pid, OldState, _VS}, St) when OldState =:= State ->
+                St;
+            (Ref, {Pid, _State, _VS}, St) ->
+                notify(Pid, Ref, Type, Id, State, Data),
+                case DoUnsub of
+                    true -> unsubscribe_int(Id, Ref, Pid, St);
+                    false -> update_sub(Id, Ref, Pid, State, VS, St)
+                end
+        end, StAcc, maps:get(Id, StAcc#st.subs, #{}))
+    end, St0, Jobs).
+
+
+notify(Pid, Ref, Type, Id, State, Data) ->
+    Pid ! {?COUCH_JOBS_EVENT, Ref, Type, Id, State, Data}.
+
+
+get_holdoff() ->
+    config:get_integer("couch_jobs", "type_monitor_holdoff_msec",
+        ?TYPE_MONITOR_HOLDOFF_DEFAULT).
+
+
+get_timeout() ->
+    Default =  ?TYPE_MONITOR_TIMEOUT_DEFAULT,
+    case config:get("couch_jobs", "type_monitor_timeout_msec", Default) of
+        "infinity" -> infinity;
+        Milliseconds -> list_to_integer(Milliseconds)
+    end.
diff --git a/src/couch_jobs/src/couch_jobs_notifier_sup.erl b/src/couch_jobs/src/couch_jobs_notifier_sup.erl
new file mode 100644
index 0000000..81d9349
--- /dev/null
+++ b/src/couch_jobs/src/couch_jobs_notifier_sup.erl
@@ -0,0 +1,64 @@
+%
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_jobs_notifier_sup).
+
+
+-behaviour(supervisor).
+
+
+-export([
+    start_link/0,
+
+    start_notifier/1,
+    stop_notifier/1,
+    get_child_pids/0
+]).
+
+-export([
+    init/1
+]).
+
+
+start_link() ->
+    supervisor:start_link({local, ?MODULE}, ?MODULE, []).
+
+
+start_notifier(Type) ->
+    supervisor:start_child(?MODULE, [Type]).
+
+
+stop_notifier(Pid) ->
+    supervisor:terminate_child(?MODULE, Pid).
+
+
+get_child_pids() ->
+    lists:map(fun({_Id, Pid, _Type, _Mod}) ->
+        Pid
+    end, supervisor:which_children(?MODULE)).
+
+
+init(_) ->
+    Flags = #{
+        strategy => simple_one_for_one,
+        intensity => 10,
+        period => 3
+    },
+    Children = [
+        #{
+            id => couch_jobs_notifier,
+            restart => temporary,
+            start => {couch_jobs_notifier, start_link, []}
+        }
+    ],
+    {ok, {Flags, Children}}.
diff --git a/src/couch_jobs/src/couch_jobs_pending.erl b/src/couch_jobs/src/couch_jobs_pending.erl
new file mode 100644
index 0000000..ab53c59
--- /dev/null
+++ b/src/couch_jobs/src/couch_jobs_pending.erl
@@ -0,0 +1,143 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_jobs_pending).
+
+
+-export([
+    enqueue/4,
+    dequeue/4,
+    remove/4
+]).
+
+
+-include("couch_jobs.hrl").
+
+
+-define(RANGE_LIMIT, 1024).
+
+
+enqueue(#{jtx := true} = JTx, Type, STime, JobId) ->
+    #{tx := Tx, jobs_path := Jobs} = JTx,
+    Key = erlfdb_tuple:pack({?PENDING, Type, STime, JobId}, Jobs),
+    erlfdb:set(Tx, Key, <<>>),
+    WatchKey = erlfdb_tuple:pack({?WATCHES_PENDING, Type}, Jobs),
+    erlfdb:add(Tx, WatchKey, 1),
+    ok.
+
+
+dequeue(#{jtx := true} = JTx, Type, _, true) ->
+    #{tx := Tx, jobs_path := Jobs} = JTx,
+    Prefix = erlfdb_tuple:pack({?PENDING, Type, 0}, Jobs),
+    case get_random_item(Tx, Prefix) of
+        {error, not_found} ->
+            {not_found, get_pending_watch(JTx, Type)};
+        {ok, PendingKey} ->
+            erlfdb:clear(Tx, PendingKey),
+            {JobId} = erlfdb_tuple:unpack(PendingKey, Prefix),
+            {ok, JobId}
+    end;
+
+dequeue(#{jtx := true} = JTx, Type, MaxPriority, _) ->
+    #{tx := Tx, jobs_path := Jobs} = JTx,
+    Prefix = erlfdb_tuple:pack({?PENDING, Type}, Jobs),
+    StartKeySel = erlfdb_key:first_greater_than(Prefix),
+    End = erlfdb_tuple:pack({MaxPriority, <<16#FF>>}, Prefix),
+    EndKeySel = erlfdb_key:first_greater_or_equal(End),
+    case clear_random_key_from_range(Tx, StartKeySel, EndKeySel) of
+        {error, not_found} ->
+            {not_found, get_pending_watch(JTx, Type)};
+        {ok, PendingKey} ->
+            {_, JobId} = erlfdb_tuple:unpack(PendingKey, Prefix),
+            {ok, JobId}
+    end.
+
+
+remove(#{jtx := true} = JTx, Type, JobId, STime) ->
+    #{tx := Tx, jobs_path := Jobs} = JTx,
+    Key = erlfdb_tuple:pack({?PENDING, Type, STime, JobId}, Jobs),
+    erlfdb:clear(Tx, Key).
+
+
+%% Private functions
+
+
+% Pick a random item from the range without reading the keys in first. However
+% the constraint it that IDs should looks like random UUIDs
+get_random_item(Tx, Prefix) ->
+    Id = fabric2_util:uuid(),
+    Snapshot = erlfdb:snapshot(Tx),
+    % Try to be fair and switch evently between trying ids before or after the
+    % randomly generated one. Otherwise, trying before first, will leave a lot
+    % of <<"fff...">> IDs in the queue for too long and trying "after" first
+    % will leave a lot of <"00...">> ones waiting.
+    case rand:uniform() > 0.5 of
+        true ->
+            case get_after(Snapshot, Prefix, Id) of
+                {error, not_found} -> get_before(Snapshot, Prefix, Id);
+                {ok, Key} -> {ok, Key}
+            end;
+        false ->
+            case get_before(Snapshot, Prefix, Id) of
+                {error, not_found} -> get_after(Snapshot, Prefix, Id);
+                {ok, Key} -> {ok, Key}
+            end
+    end.
+
+
+get_before(Snapshot, Prefix, Id) ->
+    KSel = erlfdb_key:last_less_or_equal(erlfdb_tuple:pack({Id}, Prefix)),
+    PrefixSize = byte_size(Prefix),
+    case erlfdb:wait(erlfdb:get_key(Snapshot, KSel)) of
+        <<Prefix:PrefixSize/binary, _/binary>> = Key ->  {ok, Key};
+        _ -> {error, not_found}
+    end.
+
+
+get_after(Snapshot, Prefix, Id) ->
+    KSel = erlfdb_key:first_greater_or_equal(erlfdb_tuple:pack({Id}, Prefix)),
+    PrefixSize = byte_size(Prefix),
+    case erlfdb:wait(erlfdb:get_key(Snapshot, KSel)) of
+        <<Prefix:PrefixSize/binary, _/binary>> = Key -> {ok, Key};
+        _ -> {error, not_found}
+    end.
+
+
+% Pick a random key from the range snapshot. Then radomly pick a key to clear.
+% Before clearing, ensure there is a read conflict on the key in in case other
+% workers have picked the same key.
+%
+clear_random_key_from_range(Tx, Start, End) ->
+    Opts = [
+        {limit, ?RANGE_LIMIT},
+        {snapshot, true}
+    ],
+    case erlfdb:wait(erlfdb:get_range(Tx, Start, End, Opts)) of
+        [] ->
+            {error, not_found};
+        [{Key, _}] ->
+            erlfdb:add_read_conflict_key(Tx, Key),
+            erlfdb:clear(Tx, Key),
+            {ok, Key};
+        [{_, _} | _] = KVs ->
+            Index = rand:uniform(length(KVs)),
+            {Key, _} = lists:nth(Index, KVs),
+            erlfdb:add_read_conflict_key(Tx, Key),
+            erlfdb:clear(Tx, Key),
+            {ok, Key}
+    end.
+
+
+get_pending_watch(#{jtx := true} = JTx, Type) ->
+    #{tx := Tx, jobs_path := Jobs} = couch_jobs_fdb:get_jtx(JTx),
+    Key = erlfdb_tuple:pack({?WATCHES_PENDING, Type}, Jobs),
+    erlfdb:watch(Tx, Key).
diff --git a/src/couch_jobs/src/couch_jobs_server.erl b/src/couch_jobs/src/couch_jobs_server.erl
new file mode 100644
index 0000000..2e03c7d
--- /dev/null
+++ b/src/couch_jobs/src/couch_jobs_server.erl
@@ -0,0 +1,193 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_jobs_server).
+
+-behaviour(gen_server).
+
+
+-export([
+    start_link/0,
+    get_notifier_server/1,
+    force_check_types/0
+]).
+
+-export([
+    init/1,
+    terminate/2,
+    handle_call/3,
+    handle_cast/2,
+    handle_info/2,
+    code_change/3
+]).
+
+
+-define(TYPE_CHECK_PERIOD_DEFAULT, 15000).
+-define(MAX_JITTER_DEFAULT, 5000).
+
+
+start_link() ->
+    gen_server:start_link({local, ?MODULE}, ?MODULE, nil, []).
+
+
+get_notifier_server(Type) ->
+    case get_type_pid_refs(Type) of
+        {{_, _}, {NotifierPid, _}} ->
+            {ok, NotifierPid};
+        not_found ->
+            force_check_types(),
+            case get_type_pid_refs(Type) of
+                {{_, _}, {NotifierPid, _}} ->
+                    {ok, NotifierPid};
+                not_found ->
+                    {error, not_found}
+            end
+    end.
+
+
+force_check_types() ->
+    gen_server:call(?MODULE, check_types, infinity).
+
+
+init(_) ->
+    % If couch_jobs_server is after the notifiers and activity supervisor. If
+    % it restart, there could be some stale notifier or activity monitors. Kill
+    % those as later on we'd start new ones anyway.
+    reset_monitors(),
+    reset_notifiers(),
+    ets:new(?MODULE, [protected, named_table]),
+    check_types(),
+    schedule_check(),
+    {ok, nil}.
+
+
+terminate(_, _St) ->
+    ok.
+
+
+handle_call(check_types, _From, St) ->
+    check_types(),
+    {reply, ok, St};
+
+handle_call(Msg, _From, St) ->
+    {stop, {bad_call, Msg}, {bad_call, Msg}, St}.
+
+
+handle_cast(Msg, St) ->
+    {stop, {bad_cast, Msg}, St}.
+
+
+handle_info(check_types, St) ->
+    check_types(),
+    schedule_check(),
+    {noreply, St};
+
+handle_info({'DOWN', _Ref, process, Pid, Reason}, St) ->
+    LogMsg = "~p : process ~p exited with ~p",
+    couch_log:error(LogMsg, [?MODULE, Pid, Reason]),
+    {stop, {unexpected_process_exit, Pid, Reason}, St};
+
+handle_info({Ref, ready}, St) when is_reference(Ref) ->
+    % Don't crash out couch_jobs_server and the whole application would need to
+    % eventually do proper cleanup in erlfdb:wait timeout code.
+    LogMsg = "~p : spurious erlfdb future ready message ~p",
+    couch_log:error(LogMsg, [?MODULE, Ref]),
+    {noreply, St};
+
+handle_info(Msg, St) ->
+    {stop, {bad_info, Msg}, St}.
+
+
+code_change(_OldVsn, St, _Extra) ->
+    {ok, St}.
+
+
+check_types() ->
+    FdbTypes = fdb_types(),
+    EtsTypes = ets_types(),
+    ToStart = FdbTypes -- EtsTypes,
+    ToStop = EtsTypes -- FdbTypes,
+    lists:foreach(fun(Type) -> start_monitors(Type) end, ToStart),
+    lists:foreach(fun(Type) -> stop_monitors(Type) end, ToStop).
+
+
+start_monitors(Type) ->
+    MonPidRef = case couch_jobs_activity_monitor_sup:start_monitor(Type) of
+        {ok, Pid1} -> {Pid1, monitor(process, Pid1)};
+        {error, Error1} -> error({failed_to_start_monitor, Type, Error1})
+    end,
+    NotifierPidRef = case couch_jobs_notifier_sup:start_notifier(Type) of
+        {ok, Pid2} -> {Pid2, monitor(process, Pid2)};
+        {error, Error2} -> error({failed_to_start_notifier, Type, Error2})
+    end,
+    ets:insert_new(?MODULE, {Type, MonPidRef, NotifierPidRef}).
+
+
+stop_monitors(Type) ->
+    {{MonPid, MonRef}, {NotifierPid, NotifierRef}} = get_type_pid_refs(Type),
+    ok = couch_jobs_activity_monitor_sup:stop_monitor(MonPid),
+    demonitor(MonRef, [flush]),
+    ok = couch_jobs_notifier_sup:stop_notifier(NotifierPid),
+    demonitor(NotifierRef, [flush]),
+    ets:delete(?MODULE, Type).
+
+
+reset_monitors() ->
+    lists:foreach(fun(Pid) ->
+        couch_jobs_activity_monitor_sup:stop_monitor(Pid)
+    end, couch_jobs_activity_monitor_sup:get_child_pids()).
+
+
+reset_notifiers() ->
+    lists:foreach(fun(Pid) ->
+        couch_jobs_notifier_sup:stop_notifier(Pid)
+    end, couch_jobs_notifier_sup:get_child_pids()).
+
+
+get_type_pid_refs(Type) ->
+    case ets:lookup(?MODULE, Type) of
+        [{_, MonPidRef, NotifierPidRef}] -> {MonPidRef, NotifierPidRef};
+        [] -> not_found
+    end.
+
+
+ets_types() ->
+    lists:flatten(ets:match(?MODULE, {'$1', '_', '_'})).
+
+
+fdb_types() ->
+    try
+        couch_jobs_fdb:tx(couch_jobs_fdb:get_jtx(), fun(JTx) ->
+            couch_jobs_fdb:get_types(JTx)
+        end)
+    catch
+        error:{timeout, _} ->
+            couch_log:warning("~p : Timed out connecting to FDB", [?MODULE]),
+            []
+    end.
+
+
+schedule_check() ->
+    Timeout = get_period_msec(),
+    MaxJitter = max(Timeout div 2, get_max_jitter_msec()),
+    Wait = Timeout + rand:uniform(max(1, MaxJitter)),
+    erlang:send_after(Wait, self(), check_types).
+
+
+get_period_msec() ->
+    config:get_integer("couch_jobs", "type_check_period_msec",
+        ?TYPE_CHECK_PERIOD_DEFAULT).
+
+
+get_max_jitter_msec() ->
+    config:get_integer("couch_jobs", "type_check_max_jitter_msec",
+        ?MAX_JITTER_DEFAULT).
diff --git a/src/couch_jobs/src/couch_jobs_sup.erl b/src/couch_jobs/src/couch_jobs_sup.erl
new file mode 100644
index 0000000..d790237
--- /dev/null
+++ b/src/couch_jobs/src/couch_jobs_sup.erl
@@ -0,0 +1,66 @@
+%
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_jobs_sup).
+
+
+-behaviour(supervisor).
+
+
+-export([
+    start_link/0
+]).
+
+-export([
+    init/1
+]).
+
+
+start_link() ->
+    supervisor:start_link({local, ?MODULE}, ?MODULE, []).
+
+
+init([]) ->
+    Flags = #{
+        strategy => rest_for_one,
+        intensity => 3,
+        period => 10
+    },
+    Children = [
+        #{
+            id => couch_jobs_fdb,
+            restart => transient,
+            start => {couch_jobs_fdb, init_cache, []}
+        },
+        #{
+            id => couch_jobs_activity_monitor_sup,
+            restart => permanent,
+            shutdown => brutal_kill,
+            type => supervisor,
+            start => {couch_jobs_activity_monitor_sup, start_link, []}
+        },
+        #{
+            id => couch_jobs_notifier_sup,
+            restart => permanent,
+            shutdown => brutal_kill,
+            type => supervisor,
+            start => {couch_jobs_notifier_sup, start_link, []}
+        },
+        #{
+            id => couch_jobs_server,
+            restart => permanent,
+            shutdown => brutal_kill,
+            start => {couch_jobs_server, start_link, []}
+        }
+    ],
+    {ok, {Flags, Children}}.
diff --git a/src/couch_jobs/src/couch_jobs_type_monitor.erl b/src/couch_jobs/src/couch_jobs_type_monitor.erl
new file mode 100644
index 0000000..562a866
--- /dev/null
+++ b/src/couch_jobs/src/couch_jobs_type_monitor.erl
@@ -0,0 +1,84 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_jobs_type_monitor).
+
+
+-export([
+    start/4
+]).
+
+
+-include("couch_jobs.hrl").
+
+
+-record(st, {
+    jtx,
+    type,
+    vs,
+    parent,
+    timestamp,
+    holdoff,
+    timeout
+}).
+
+
+start(Type, VS, HoldOff, Timeout) ->
+    Parent = self(),
+    spawn_link(fun() ->
+        loop(#st{
+            jtx = couch_jobs_fdb:get_jtx(),
+            type = Type,
+            vs = VS,
+            parent = Parent,
+            timestamp = 0,
+            holdoff = HoldOff,
+            timeout = Timeout
+        })
+    end).
+
+
+loop(#st{vs = VS, timeout = Timeout} = St) ->
+    {St1, Watch} = case get_vs_and_watch(St) of
+        {VS1, W} when VS1 =/= VS -> {notify(St#st{vs = VS1}), W};
+        {VS, W} -> {St, W}
+    end,
+    try
+        erlfdb:wait(Watch, [{timeout, Timeout}])
+    catch
+        error:{erlfdb_error, 1009} ->
+            erlfdb:cancel(Watch, [flush]),
+            ok;
+        error:{timeout, _} ->
+            erlfdb:cancel(Watch, [flush]),
+            ok
+    end,
+    loop(St1).
+
+
+notify(#st{} = St) ->
+    #st{holdoff = HoldOff, parent = Pid, timestamp = Ts, vs = VS} = St,
+    Now = erlang:system_time(millisecond),
+    case Now - Ts of
+        Dt when Dt < HoldOff ->
+            timer:sleep(max(HoldOff - Dt, 0));
+        _ ->
+            ok
+    end,
+    Pid ! {type_updated, VS},
+    St#st{timestamp = Now}.
+
+
+get_vs_and_watch(#st{jtx = JTx, type = Type}) ->
+    couch_jobs_fdb:tx(JTx, fun(JTx1) ->
+        couch_jobs_fdb:get_activity_vs_and_watch(JTx1, Type)
+    end).
diff --git a/src/couch_jobs/test/couch_jobs_tests.erl b/src/couch_jobs/test/couch_jobs_tests.erl
new file mode 100644
index 0000000..a7e085e
--- /dev/null
+++ b/src/couch_jobs/test/couch_jobs_tests.erl
@@ -0,0 +1,606 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_jobs_tests).
+
+
+-include_lib("couch/include/couch_db.hrl").
+-include_lib("couch/include/couch_eunit.hrl").
+-include_lib("eunit/include/eunit.hrl").
+
+
+% Job creation API can take an undefined Tx object
+% in that case it will start its own transaction
+-define(TX, undefined).
+
+
+couch_jobs_basic_test_() ->
+    {
+        "Test couch jobs basics",
+        {
+            setup,
+            fun setup_couch/0, fun teardown_couch/1,
+            {
+                foreach,
+                fun setup/0, fun teardown/1,
+                [
+                    fun add_remove_pending/1,
+                    fun add_remove_errors/1,
+                    fun get_job_data_and_state/1,
+                    fun resubmit_as_job_creator/1,
+                    fun type_timeouts_and_server/1,
+                    fun dead_notifier_restarts_jobs_server/1,
+                    fun bad_messages_restart_couch_jobs_server/1,
+                    fun bad_messages_restart_notifier/1,
+                    fun bad_messages_restart_activity_monitor/1,
+                    fun basic_accept_and_finish/1,
+                    fun accept_blocking/1,
+                    fun job_processor_update/1,
+                    fun resubmit_enqueues_job/1,
+                    fun resubmit_custom_schedtime/1,
+                    fun accept_max_schedtime/1,
+                    fun accept_no_schedule/1,
+                    fun subscribe/1,
+                    fun subscribe_wait_multiple/1,
+                    fun enqueue_inactive/1,
+                    fun remove_running_job/1,
+                    fun check_get_jobs/1,
+                    fun use_fabric_transaction_object/1
+                ]
+            }
+        }
+    }.
+
+
+setup_couch() ->
+    test_util:start_couch([fabric]).
+
+
+teardown_couch(Ctx) ->
+    test_util:stop_couch(Ctx),
+    meck:unload().
+
+
+setup() ->
+    application:start(couch_jobs),
+    clear_jobs(),
+    T1 = {<<"t1">>, 1024}, % a complex type should work
+    T2 = 42, % a number should work as well
+    T1Timeout = 2,
+    T2Timeout = 3,
+    couch_jobs:set_type_timeout(T1, T1Timeout),
+    couch_jobs:set_type_timeout(T2, T2Timeout),
+    #{
+        t1 => T1,
+        t2 => T2,
+        t1_timeout => T1Timeout,
+        j1 => <<"j1">>,
+        j2 => <<"j2">>,
+        dbname => ?tempdb()
+    }.
+
+
+teardown(#{dbname := DbName}) ->
+    clear_jobs(),
+    application:stop(couch_jobs),
+    AllDbs = fabric2_db:list_dbs(),
+    case lists:member(DbName, AllDbs) of
+        true -> ok = fabric2_db:delete(DbName, []);
+        false -> ok
+    end,
+    meck:unload().
+
+
+clear_jobs() ->
+    couch_jobs_fdb:tx(couch_jobs_fdb:get_jtx(), fun(JTx) ->
+        #{jobs_path := Jobs, tx := Tx} = JTx,
+        erlfdb:clear_range_startswith(Tx, Jobs)
+    end).
+
+
+restart_app() ->
+    application:stop(couch_jobs),
+    application:start(couch_jobs),
+    couch_jobs_server:force_check_types().
+
+
+get_job(Type, JobId) ->
+    couch_jobs_fdb:get_job(Type, JobId).
+
+
+add_remove_pending(#{t1 := T1, j1 := J1, t2 := T2, j2 := J2}) ->
+    ?_test(begin
+        ?assertEqual(ok, couch_jobs:add(?TX, T1, J1, #{})),
+        ?assertMatch(#{state := pending, data := #{}}, get_job(T1, J1)),
+        ?assertEqual(ok, couch_jobs:remove(?TX, T1, J1)),
+        % Data and numeric type should work as well. Also do it in a
+        % transaction
+        Data = #{<<"x">> => 42},
+        ?assertEqual(ok, fabric2_fdb:transactional(fun(Tx) ->
+            couch_jobs:add(Tx, T2, J2, Data)
+        end)),
+        ?assertMatch(#{state := pending, data := Data}, get_job(T2, J2)),
+        ?assertEqual(ok, couch_jobs:remove(?TX, T2, J2))
+    end).
+
+
+get_job_data_and_state(#{t1 := T, j1 := J}) ->
+    ?_test(begin
+        Data = #{<<"x">> => 42},
+        ok = couch_jobs:add(?TX, T, J, Data),
+        ?assertEqual({ok, Data}, couch_jobs:get_job_data(?TX, T, J)),
+        ?assertEqual({ok, pending}, couch_jobs:get_job_state(?TX, T, J)),
+        ?assertEqual(ok, couch_jobs:remove(?TX, T, J)),
+        ?assertEqual({error, not_found}, couch_jobs:get_job_data(?TX, T, J)),
+        ?assertEqual({error, not_found}, couch_jobs:get_job_state(?TX, T, J))
+    end).
+
+
+add_remove_errors(#{t1 := T, j1 := J}) ->
+    ?_test(begin
+        ?assertEqual({error, not_found}, couch_jobs:remove(?TX, 999, <<"x">>)),
+        ?assertMatch({error, {json_encoding_error, _}}, couch_jobs:add(?TX, T,
+            J, #{1 => 2})),
+        ?assertEqual({error, no_type_timeout}, couch_jobs:add(?TX, <<"x">>, J,
+            #{})),
+        ?assertEqual(ok, couch_jobs:add(?TX, T, J, #{})),
+        ?assertEqual(ok, couch_jobs:add(?TX, T, J, #{})),
+        ?assertEqual(ok, couch_jobs:remove(?TX, T, J))
+    end).
+
+
+resubmit_as_job_creator(#{t1 := T, j1 := J}) ->
+    ?_test(begin
+        Data = #{<<"x">> => 42},
+        ok = couch_jobs:add(?TX, T, J, Data, 15),
+
+        % Job was pending, doesn't get resubmitted
+        ok = couch_jobs:add(?TX, T, J, Data, 16),
+        ?assertMatch(#{state := pending, stime := 16}, get_job(T, J)),
+
+        {ok, Job1, Data} = couch_jobs:accept(T),
+
+        % If is running, it gets flagged to be resubmitted
+        ok = couch_jobs:add(?TX, T, J, Data, 17),
+        ?assertMatch(#{state := running, stime := 17}, get_job(T, J)),
+        ?assertEqual(true, couch_jobs:is_resubmitted(get_job(T, J))),
+
+        ?assertEqual(ok, couch_jobs:finish(?TX, Job1)),
+        % It should be pending according to the resubmit flag
+        ?assertMatch(#{state := pending, stime := 17}, get_job(T, J)),
+
+        % A finished job will be re-enqueued
+        {ok, Job2, _} = couch_jobs:accept(T),
+        ?assertEqual(ok, couch_jobs:finish(?TX, Job2)),
+        ?assertMatch(#{state := finished, stime := 17}, get_job(T, J)),
+        ok = couch_jobs:add(?TX, T, J, Data, 18),
+        ?assertMatch(#{state := pending, stime := 18}, get_job(T, J))
+    end).
+
+
+type_timeouts_and_server(#{t1 := T, t1_timeout := T1Timeout}) ->
+    ?_test(begin
+        couch_jobs_server:force_check_types(),
+
+        ?assertEqual(T1Timeout, couch_jobs:get_type_timeout(T)),
+
+        ?assertEqual(2,
+            length(couch_jobs_activity_monitor_sup:get_child_pids())),
+        ?assertEqual(2, length(couch_jobs_notifier_sup:get_child_pids())),
+        ?assertMatch({ok, _}, couch_jobs_server:get_notifier_server(T)),
+
+        ?assertEqual(ok, couch_jobs:set_type_timeout(<<"t3">>, 8)),
+        couch_jobs_server:force_check_types(),
+        ?assertEqual(3,
+            length(couch_jobs_activity_monitor_sup:get_child_pids())),
+        ?assertEqual(3, length(couch_jobs_notifier_sup:get_child_pids())),
+
+        ?assertEqual(ok, couch_jobs:clear_type_timeout(<<"t3">>)),
+        couch_jobs_server:force_check_types(),
+        ?assertEqual(2,
+            length(couch_jobs_activity_monitor_sup:get_child_pids())),
+        ?assertEqual(2,
+            length(couch_jobs_notifier_sup:get_child_pids())),
+        ?assertMatch({error, _},
+            couch_jobs_server:get_notifier_server(<<"t3">>)),
+
+        ?assertEqual(not_found, couch_jobs:get_type_timeout(<<"t3">>))
+    end).
+
+
+dead_notifier_restarts_jobs_server(#{}) ->
+    ?_test(begin
+        couch_jobs_server:force_check_types(),
+
+        ServerPid = whereis(couch_jobs_server),
+        Ref = monitor(process, ServerPid),
+
+        [Notifier1, _Notifier2] = couch_jobs_notifier_sup:get_child_pids(),
+        exit(Notifier1, kill),
+
+        % Killing a notifier should kill the server as well
+        receive {'DOWN', Ref, _, _, _} -> ok end
+    end).
+
+
+bad_messages_restart_couch_jobs_server(#{}) ->
+    ?_test(begin
+        % couch_jobs_server dies on bad cast
+        ServerPid1 = whereis(couch_jobs_server),
+        Ref1 = monitor(process, ServerPid1),
+        gen_server:cast(ServerPid1, bad_cast),
+        receive {'DOWN', Ref1, _, _, _} -> ok end,
+
+        restart_app(),
+
+        % couch_jobs_server dies on bad call
+        ServerPid2 = whereis(couch_jobs_server),
+        Ref2 = monitor(process, ServerPid2),
+        catch gen_server:call(ServerPid2, bad_call),
+        receive {'DOWN', Ref2, _, _, _} -> ok end,
+
+        restart_app(),
+
+        % couch_jobs_server dies on bad info
+        ServerPid3 = whereis(couch_jobs_server),
+        Ref3 = monitor(process, ServerPid3),
+        ServerPid3 ! a_random_message,
+        receive {'DOWN', Ref3, _, _, _} -> ok end,
+
+        restart_app()
+    end).
+
+
+bad_messages_restart_notifier(#{}) ->
+    ?_test(begin
+        couch_jobs_server:force_check_types(),
+
+        % bad cast kills the activity monitor
+        [AMon1, _] = couch_jobs_notifier_sup:get_child_pids(),
+        Ref1 = monitor(process, AMon1),
+        gen_server:cast(AMon1, bad_cast),
+        receive {'DOWN', Ref1, _, _, _} -> ok end,
+
+        restart_app(),
+
+        % bad calls restart activity monitor
+        [AMon2, _] = couch_jobs_notifier_sup:get_child_pids(),
+        Ref2 = monitor(process, AMon2),
+        catch gen_server:call(AMon2, bad_call),
+        receive {'DOWN', Ref2, _, _, _} -> ok end,
+
+        restart_app(),
+
+        % bad info message kills activity monitor
+        [AMon3, _] = couch_jobs_notifier_sup:get_child_pids(),
+        Ref3 = monitor(process, AMon3),
+        AMon3 ! a_bad_message,
+        receive {'DOWN', Ref3, _, _, _} -> ok end,
+
+
+        restart_app()
+    end).
+
+
+bad_messages_restart_activity_monitor(#{}) ->
+    ?_test(begin
+        couch_jobs_server:force_check_types(),
+
+        % bad cast kills the activity monitor
+        [AMon1, _] = couch_jobs_activity_monitor_sup:get_child_pids(),
+        Ref1 = monitor(process, AMon1),
+        gen_server:cast(AMon1, bad_cast),
+        receive {'DOWN', Ref1, _, _, _} -> ok end,
+
+        restart_app(),
+
+        % bad calls restart activity monitor
+        [AMon2, _] = couch_jobs_activity_monitor_sup:get_child_pids(),
+        Ref2 = monitor(process, AMon2),
+        catch gen_server:call(AMon2, bad_call),
+        receive {'DOWN', Ref2, _, _, _} -> ok end,
+
+        restart_app(),
+
+        % bad info message kills activity monitor
+        [AMon3, _] = couch_jobs_activity_monitor_sup:get_child_pids(),
+        Ref3 = monitor(process, AMon3),
+        AMon3 ! a_bad_message,
+        receive {'DOWN', Ref3, _, _, _} -> ok end,
+
+        restart_app()
+    end).
+
+
+basic_accept_and_finish(#{t1 := T, j1 := J}) ->
+    ?_test(begin
+        ok = couch_jobs:add(?TX, T, J, #{}),
+        {ok, Job, #{}} = couch_jobs:accept(T),
+        ?assertMatch(#{state := running}, get_job(T, J)),
+        % check json validation for bad data in finish
+        ?assertMatch({error, {json_encoding_error, _}},
+            fabric2_fdb:transactional(fun(Tx) ->
+                couch_jobs:finish(Tx, Job, #{1 => 1})
+            end)),
+        Data = #{<<"x">> => 42},
+        ?assertEqual(ok, fabric2_fdb:transactional(fun(Tx) ->
+            couch_jobs:finish(Tx, Job, Data)
+        end)),
+        ?assertMatch(#{state := finished, data := Data}, get_job(T, J))
+    end).
+
+
+accept_blocking(#{t1 := T, j1 := J1, j2 := J2}) ->
+    ?_test(begin
+        Accept = fun() -> exit(couch_jobs:accept(T)) end,
+        WaitAccept = fun(Ref) ->
+            receive
+                {'DOWN', Ref, _, _, Res} -> Res
+            after
+                500 -> timeout
+            end
+        end,
+        {_, Ref1} = spawn_monitor(Accept),
+        ok = couch_jobs:add(?TX, T, J1, #{}),
+        ?assertMatch({ok, #{id := J1}, #{}}, WaitAccept(Ref1)),
+        {_, Ref2} = spawn_monitor(Accept),
+        ?assertEqual(timeout, WaitAccept(Ref2)),
+        ok = couch_jobs:add(?TX, T, J2, #{}),
+        ?assertMatch({ok, #{id := J2}, #{}}, WaitAccept(Ref2))
+    end).
+
+
+job_processor_update(#{t1 := T, j1 := J}) ->
+    ?_test(begin
+        ok = couch_jobs:add(?TX, T, J, #{}),
+        {ok, Job, #{}} = couch_jobs:accept(T),
+
+        % Use proper transactions in a few places here instead of passing in
+        % ?TX This is mostly to increase code coverage
+
+        ?assertMatch({ok, #{job := true}}, fabric2_fdb:transactional(fun(Tx) ->
+            couch_jobs:update(Tx, Job, #{<<"x">> => 1})
+        end)),
+
+        ?assertMatch(#{data := #{<<"x">> := 1}, state := running},
+            get_job(T, J)),
+
+        ?assertMatch({ok, #{job := true}}, fabric2_fdb:transactional(fun(Tx) ->
+            couch_jobs:update(Tx, Job)
+        end)),
+
+        ?assertMatch(#{data := #{<<"x">> := 1}, state := running},
+            get_job(T, J)),
+
+        ?assertMatch({ok, #{job := true}}, fabric2_fdb:transactional(fun(Tx) ->
+            couch_jobs:update(Tx, Job, #{<<"x">> => 2})
+        end)),
+
+        % check json validation for bad data in update
+        ?assertMatch({error, {json_encoding_error, _}},
+            fabric2_fdb:transactional(fun(Tx) ->
+                couch_jobs:update(Tx, Job, #{1 => 1})
+            end)),
+
+        ?assertMatch(#{data := #{<<"x">> := 2}, state := running},
+            get_job(T, J)),
+
+        % Finish may update the data as well
+        ?assertEqual(ok, couch_jobs:finish(?TX, Job, #{<<"x">> => 3})),
+        ?assertMatch(#{data := #{<<"x">> := 3}, state := finished},
+            get_job(T, J))
+    end).
+
+
+resubmit_enqueues_job(#{t1 := T, j1 := J}) ->
+    ?_test(begin
+        ok = couch_jobs:add(?TX, T, J, #{}),
+        {ok, Job1, #{}} = couch_jobs:accept(T),
+        ?assertMatch({ok, _}, couch_jobs:resubmit(?TX, Job1, 6)),
+        ?assertEqual(ok, couch_jobs:finish(?TX, Job1)),
+        ?assertMatch(#{state := pending, stime := 6}, get_job(T, J)),
+        {ok, Job2, #{}} = couch_jobs:accept(T),
+        ?assertEqual(ok, couch_jobs:finish(?TX, Job2)),
+        ?assertMatch(#{state := finished}, get_job(T, J))
+    end).
+
+
+resubmit_custom_schedtime(#{t1 := T, j1 := J}) ->
+    ?_test(begin
+        ?assertEqual(ok, couch_jobs:add(?TX, T, J, #{}, 7)),
+        {ok, Job, #{}} = couch_jobs:accept(T),
+        ?assertMatch({ok, _}, couch_jobs:resubmit(?TX, Job, 9)),
+        ?assertEqual(ok, couch_jobs:finish(?TX, Job)),
+        ?assertMatch(#{stime := 9, state := pending}, get_job(T, J))
+    end).
+
+
+accept_max_schedtime(#{t1 := T, j1 := J1, j2 := J2}) ->
+    ?_test(begin
+        ok = couch_jobs:add(?TX, T, J1, #{}, 5000),
+        ok = couch_jobs:add(?TX, T, J2, #{}, 3000),
+        ?assertEqual({error, not_found}, couch_jobs:accept(T,
+            #{max_sched_time => 1000})),
+        ?assertMatch({ok, #{id := J2}, _}, couch_jobs:accept(T,
+            #{max_sched_time => 3000})),
+        ?assertMatch({ok, #{id := J1}, _}, couch_jobs:accept(T,
+            #{max_sched_time => 9000}))
+    end).
+
+
+accept_no_schedule(#{t1 := T}) ->
+    ?_test(begin
+        JobCount = 25,
+        Jobs = [fabric2_util:uuid() || _ <- lists:seq(1, JobCount)],
+        [couch_jobs:add(?TX, T, J, #{}) || J <- Jobs],
+        InvalidOpts = #{no_schedule => true, max_sched_time => 1},
+        ?assertMatch({error, _}, couch_jobs:accept(T, InvalidOpts)),
+        AcceptOpts = #{no_schedule => true},
+        Accepted = [begin
+            {ok, #{id := J}, _} = couch_jobs:accept(T, AcceptOpts),
+            J
+        end || _ <- lists:seq(1, JobCount)],
+        ?assertEqual(lists:sort(Jobs), lists:sort(Accepted))
+    end).
+
+
+subscribe(#{t1 := T, j1 := J}) ->
+    ?_test(begin
+        ok = couch_jobs:add(?TX, T, J, #{<<"z">> => 1}),
+
+        ?assertEqual({error, not_found}, couch_jobs:subscribe(<<"xyz">>, J)),
+        ?assertEqual({error, not_found}, couch_jobs:subscribe(T, <<"j5">>)),
+
+        SubRes0 =  couch_jobs:subscribe(T, J),
+        ?assertMatch({ok, {_, _}, pending, #{<<"z">> := 1}}, SubRes0),
+        {ok, SubId0, pending, _} = SubRes0,
+
+        SubRes1 = couch_jobs:subscribe(T, J),
+        ?assertEqual(SubRes0, SubRes1),
+
+        ?assertEqual(ok, couch_jobs:unsubscribe(SubId0)),
+
+        SubRes =  couch_jobs:subscribe(T, J),
+        ?assertMatch({ok, {_, _}, pending, #{<<"z">> := 1}}, SubRes),
+        {ok, SubId, pending, _} = SubRes,
+
+        {ok, Job, _} = couch_jobs:accept(T),
+        ?assertMatch({T, J, running, #{<<"z">> := 1}},
+            couch_jobs:wait(SubId, 5000)),
+
+        % Make sure we get intermediate `running` updates
+        ?assertMatch({ok, _}, couch_jobs:update(?TX, Job, #{<<"z">> => 2})),
+        ?assertMatch({T, J, running, #{<<"z">> := 2}},
+            couch_jobs:wait(SubId, 5000)),
+
+        ?assertEqual(ok, couch_jobs:finish(?TX, Job, #{<<"z">> => 3})),
+        ?assertMatch({T, J, finished, #{<<"z">> := 3}},
+            couch_jobs:wait(SubId, finished, 5000)),
+
+        ?assertEqual(timeout, couch_jobs:wait(SubId, 50)),
+
+        ?assertEqual({ok, finished, #{<<"z">> => 3}},
+            couch_jobs:subscribe(T, J)),
+
+        ?assertEqual(ok, couch_jobs:remove(?TX, T, J)),
+        ?assertEqual({error, not_found}, couch_jobs:subscribe(T, J))
+    end).
+
+
+subscribe_wait_multiple(#{t1 := T, j1 := J1, j2 := J2}) ->
+    ?_test(begin
+        ok = couch_jobs:add(?TX, T, J1, #{}),
+        ok = couch_jobs:add(?TX, T, J2, #{}),
+
+        {ok, S1, pending, #{}} = couch_jobs:subscribe(T, J1),
+        {ok, S2, pending, #{}} = couch_jobs:subscribe(T, J2),
+
+        Subs = [S1, S2],
+
+        % Accept one job. Only one running update is expected. PJob1 and PJob2
+        % do not necessarily correspond got Job1 and Job2, they could be
+        % accepted as Job2 and Job1 respectively.
+        {ok, PJob1, _} = couch_jobs:accept(T),
+        ?assertMatch({_, _, running, _}, couch_jobs:wait(Subs, 5000)),
+        ?assertMatch(timeout, couch_jobs:wait(Subs, 50)),
+
+        % Accept another job. Expect another update.
+        {ok, PJob2, _} = couch_jobs:accept(T),
+        ?assertMatch({_, _, running, _}, couch_jobs:wait(Subs, 5000)),
+        ?assertMatch(timeout, couch_jobs:wait(Subs, 50)),
+
+        ?assertMatch({ok, _}, couch_jobs:update(?TX, PJob1, #{<<"q">> => 5})),
+        ?assertMatch({ok, _}, couch_jobs:update(?TX, PJob2, #{<<"r">> => 6})),
+
+        % Each job was updated once, expect two running updates.
+        ?assertMatch({_, _, running, _}, couch_jobs:wait(Subs, 5000)),
+        ?assertMatch({_, _, running, _}, couch_jobs:wait(Subs, 5000)),
+
+        % Finish one job. Expect one finished update only.
+        ?assertEqual(ok, couch_jobs:finish(?TX, PJob1)),
+
+        ?assertMatch({_, _, finished, #{<<"q">> := 5}},
+            couch_jobs:wait(Subs, finished, 5000)),
+        ?assertMatch(timeout, couch_jobs:wait(Subs, finished, 50)),
+
+        % Finish another job. However, unsubscribe should flush the
+        % the message and we should not get it.
+        ?assertEqual(ok, couch_jobs:finish(?TX, PJob2)),
+        ?assertEqual(ok, couch_jobs:unsubscribe(S1)),
+        ?assertEqual(ok, couch_jobs:unsubscribe(S2)),
+        ?assertMatch(timeout, couch_jobs:wait(Subs, finished, 50))
+    end).
+
+
+enqueue_inactive(#{t1 := T, j1 := J, t1_timeout := Timeout}) ->
+    {timeout, 10, ?_test(begin
+        couch_jobs_server:force_check_types(),
+
+        ok  = couch_jobs:add(?TX, T, J, #{<<"y">> => 1}),
+        {ok, Job, _} = couch_jobs:accept(T),
+
+        {ok, SubId, running, #{<<"y">> := 1}} = couch_jobs:subscribe(T, J),
+        Wait = 3 * Timeout * 1000,
+        ?assertEqual({T, J, pending, #{<<"y">> => 1}},
+            couch_jobs:wait(SubId, pending, Wait)),
+        ?assertMatch(#{state := pending}, get_job(T, J)),
+
+        % After job was re-enqueued, old job processor can't update it anymore
+        ?assertEqual({error, halt}, couch_jobs:update(?TX, Job)),
+        ?assertEqual({error, halt}, couch_jobs:finish(?TX, Job))
+    end)}.
+
+
+remove_running_job(#{t1 := T, j1 := J}) ->
+    ?_test(begin
+        ok = couch_jobs:add(?TX, T, J, #{}),
+        {ok, Job, _} = couch_jobs:accept(T),
+        ?assertEqual(ok, couch_jobs:remove(?TX, T, J)),
+        ?assertEqual({error, not_found}, couch_jobs:remove(?TX, T, J)),
+        ?assertEqual({error, halt}, couch_jobs:update(?TX, Job)),
+        ?assertEqual({error, halt}, couch_jobs:finish(?TX, Job))
+    end).
+
+
+check_get_jobs(#{t1 := T1, j1 := J1, t2 := T2, j2 := J2}) ->
+    ?_test(begin
+        ok = couch_jobs:add(?TX, T1, J1, #{}),
+        ok = couch_jobs:add(?TX, T2, J2, #{}),
+        ?assertMatch([
+            {T2, J2, pending, #{}},
+            {T1, J1, pending, #{}}
+        ], lists:sort(couch_jobs_fdb:get_jobs())),
+        {ok, _, _} = couch_jobs:accept(T1),
+        ?assertMatch([
+            {T2, J2, pending, #{}},
+            {T1, J1, running, #{}}
+        ], lists:sort(couch_jobs_fdb:get_jobs()))
+    end).
+
+
+use_fabric_transaction_object(#{t1 := T1, j1 := J1, dbname := DbName}) ->
+    ?_test(begin
+        {ok, Db} = fabric2_db:create(DbName, []),
+        ?assertEqual(ok, couch_jobs:add(Db, T1, J1, #{})),
+        ?assertMatch(#{state := pending, data := #{}}, get_job(T1, J1)),
+        {ok, Job, _} = couch_jobs:accept(T1),
+        ?assertEqual(ok, fabric2_fdb:transactional(Db, fun(Db1) ->
+            {ok, #{}} = couch_jobs:get_job_data(Db1, T1, J1),
+            Doc1 = #doc{id = <<"1">>, body = {[]}},
+            {ok, {_, _}} = fabric2_db:update_doc(Db1, Doc1),
+            Doc2 = #doc{id = <<"2">>, body = {[]}},
+            {ok, {_, _}} = fabric2_db:update_doc(Db1, Doc2),
+            couch_jobs:finish(Db1, Job, #{<<"d">> => 1})
+        end)),
+        ok = couch_jobs:remove(#{tx => undefined}, T1, J1),
+        ok = fabric2_db:delete(DbName, [])
+    end).


Mime
View raw message