couchdb-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From beno...@apache.org
Subject [41/57] [abbrv] [partial] inital move to rebar compilation
Date Tue, 07 Jan 2014 00:37:01 GMT
http://git-wip-us.apache.org/repos/asf/couchdb/blob/add91738/apps/couch/src/json_stream_parse.erl
----------------------------------------------------------------------
diff --git a/apps/couch/src/json_stream_parse.erl b/apps/couch/src/json_stream_parse.erl
new file mode 100644
index 0000000..b63e011
--- /dev/null
+++ b/apps/couch/src/json_stream_parse.erl
@@ -0,0 +1,432 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(json_stream_parse).
+
+
+-export([events/2, to_ejson/1, collect_object/2]).
+
+-define(IS_WS(X), (X == $\  orelse X == $\t orelse X == $\n orelse X == $\r)).
+-define(IS_DELIM(X), (X == $} orelse X == $] orelse X == $,)).
+-define(IS_DIGIT(X), (X >= $0 andalso X =< $9)).
+
+
+
+% Parses the json into events.
+%
+% The DataFun param is a function that produces the data for parsing. When
+% called it must yield a tuple, or the atom done. The first element in the
+% tuple is the data itself, and the second element is a function to be called
+% next to get the next chunk of data in the stream.
+%
+% The EventFun is called everytime a json element is parsed. It must produce
+% a new function to be called for the next event.
+%
+% Events happen each time a new element in the json string is parsed.
+% For simple value types, the data itself is returned:
+% Strings
+% Integers
+% Floats
+% true
+% false
+% null
+%
+% For arrays, the start of the array is signaled by the event array_start
+% atom. The end is signaled by array_end. The events before the end are the
+% values, or nested values.
+%
+% For objects, the start of the object is signaled by the event object_start
+% atom. The end is signaled by object_end. Each key is signaled by
+% {key, KeyString}, and the following event is the value, or start of the
+% value (array_start, object_start).
+%
+events(Data,EventFun) when is_list(Data)->
+    events(list_to_binary(Data),EventFun);
+events(Data,EventFun) when is_binary(Data)->
+    events(fun() -> {Data, fun() -> done end} end,EventFun);
+events(DataFun,EventFun) ->
+    parse_one(DataFun, EventFun, <<>>).
+
+% converts the JSON directly to the erlang represention of Json
+to_ejson(DF) ->
+    {_DF2, EF, _Rest} = events(DF, fun(Ev) -> collect_events(Ev, []) end),
+    [[EJson]] = make_ejson(EF(get_results), [[]]),
+    EJson.
+
+
+% This function is used to return complete objects while parsing streams.
+%
+% Return this function from inside an event function right after getting an
+% object_start event. It then collects the remaining events for that object
+% and converts it to the erlang represention of Json.
+%
+% It then calls your ReturnControl function with the erlang object. Your
+% return control function then should yield another event function.
+%
+% This example stream parses an array of objects, calling
+% fun do_something_with_the_object/1 for each object.
+%
+%    ev_array(array_start) ->
+%        fun(Ev) -> ev_object_loop(Ev) end.
+%
+%    ev_object_loop(object_start) ->
+%        fun(Ev) ->
+%            json_stream_parse:collect_object(Ev,
+%                fun(Obj) ->
+%                    do_something_with_the_object(Obj),
+%                    fun(Ev2) -> ev_object_loop(Ev2) end
+%                end)
+%        end;
+%    ev_object_loop(array_end) ->
+%        ok
+%    end.
+%
+%    % invoke the parse
+%    main() ->
+%        ...
+%        events(Data, fun(Ev) -> ev_array(Ev) end).
+
+collect_object(Ev, ReturnControl) ->
+    collect_object(Ev, 0, ReturnControl, [object_start]).
+
+
+
+% internal methods
+
+parse_one(DF,EF,Acc) ->
+    case toke(DF, Acc) of
+    none ->
+        none;
+    {Token, DF2, Rest} ->
+        case Token of
+        "{" ->
+            EF2 = EF(object_start),
+            {DF3, EF3, Rest2} = parse_object(DF2, EF2, Rest),
+            {DF3, EF3(object_end), Rest2};
+        "[" ->
+            EF2 = EF(array_start),
+            {DF3, EF3, Rest2} = parse_array(DF2, EF2, Rest),
+            {DF3, EF3(array_end), Rest2};
+        Int when is_integer(Int)->
+            {DF2, EF(Int), Rest};
+        Float when is_float(Float)->
+            {DF2, EF(Float), Rest};
+        Atom when is_atom(Atom)->
+            {DF2, EF(Atom), Rest};
+        String when is_binary(String)->
+            {DF2, EF(String), Rest};
+        _OtherToken ->
+            err(unexpected_token)
+        end
+    end.
+
+must_parse_one(DF,EF,Acc,Error)->
+    case parse_one(DF, EF, Acc) of
+    none ->
+        err(Error);
+    Else ->
+        Else
+    end.
+
+must_toke(DF, Data, Error) ->
+    case toke(DF, Data) of
+    none ->
+        err(Error);
+    Result ->
+        Result
+    end.
+
+toke(DF, <<>>) ->
+    case DF() of
+    done ->
+        none;
+    {Data, DF2} ->
+        toke(DF2, Data)
+    end;
+toke(DF, <<C,Rest/binary>>) when ?IS_WS(C)->
+    toke(DF, Rest);
+toke(DF, <<${,Rest/binary>>) ->
+    {"{", DF, Rest};
+toke(DF, <<$},Rest/binary>>) ->
+    {"}", DF, Rest};
+toke(DF, <<$[,Rest/binary>>) ->
+    {"[", DF, Rest};
+toke(DF, <<$],Rest/binary>>) ->
+    {"]", DF, Rest};
+toke(DF, <<$",Rest/binary>>) ->
+    toke_string(DF,Rest,[]);
+toke(DF, <<$,,Rest/binary>>) ->
+    {",", DF, Rest};
+toke(DF, <<$:,Rest/binary>>) ->
+    {":", DF, Rest};
+toke(DF, <<$-,Rest/binary>>) ->
+    {<<C,_/binary>> = Data, DF2} = must_df(DF,1,Rest,expected_number),
+    case ?IS_DIGIT(C) of
+    true ->
+        toke_number_leading(DF2, Data, "-");
+    false ->
+        err(expected_number)
+    end;
+toke(DF, <<C,_/binary>> = Data) when ?IS_DIGIT(C) ->
+    toke_number_leading(DF, Data, []);
+toke(DF, <<$t,Rest/binary>>) ->
+    {Data, DF2} = must_match(<<"rue">>, DF, Rest),
+    {true, DF2, Data};
+toke(DF, <<$f,Rest/binary>>) ->
+    {Data, DF2} = must_match(<<"alse">>, DF, Rest),
+    {false, DF2, Data};
+toke(DF, <<$n,Rest/binary>>) ->
+    {Data, DF2} = must_match(<<"ull">>, DF, Rest),
+    {null, DF2, Data};
+toke(_, _) ->
+    err(bad_token).
+
+
+must_match(Pattern, DF, Data) ->
+    Size = size(Pattern),
+    case must_df(DF, Size, Data, bad_token) of
+    {<<Pattern:Size/binary,Data2/binary>>, DF2} ->
+        {Data2, DF2};
+    {_, _} ->
+        err(bad_token)
+    end.
+
+must_df(DF,Error)->
+    case DF() of
+    done ->
+        err(Error);
+    {Data, DF2} ->
+        {Data, DF2}
+    end.
+
+
+must_df(DF,NeedLen,Acc,Error)->
+    if size(Acc) >= NeedLen ->
+        {Acc, DF};
+    true ->
+        case DF() of
+        done ->
+            err(Error);
+        {Data, DF2} ->
+            must_df(DF2, NeedLen, <<Acc/binary, Data/binary>>, Error)
+        end
+    end.
+
+
+parse_object(DF,EF,Acc) ->
+    case must_toke(DF, Acc, unterminated_object) of
+    {String, DF2, Rest} when is_binary(String)->
+        EF2 = EF({key,String}),
+        case must_toke(DF2,Rest,unterminated_object) of
+        {":", DF3, Rest2} ->
+            {DF4, EF3, Rest3} = must_parse_one(DF3, EF2, Rest2, expected_value),
+            case must_toke(DF4,Rest3, unterminated_object) of
+            {",", DF5, Rest4} ->
+                parse_object(DF5, EF3, Rest4);
+            {"}", DF5, Rest4} ->
+                {DF5, EF3, Rest4};
+            {_, _, _} ->
+                err(unexpected_token)
+            end;
+        _Else ->
+            err(expected_colon)
+        end;
+    {"}", DF2, Rest} ->
+        {DF2, EF, Rest};
+    {_, _, _} ->
+        err(unexpected_token)
+    end.
+
+parse_array0(DF,EF,Acc) ->
+    case toke(DF, Acc) of
+    none ->
+        err(unterminated_array);
+    {",", DF2, Rest} ->
+        parse_array(DF2,EF,Rest);
+    {"]", DF2, Rest} ->
+        {DF2,EF,Rest};
+    _ ->
+        err(unexpected_token)
+    end.
+
+parse_array(DF,EF,Acc) ->
+    case toke(DF, Acc) of
+    none ->
+         err(unterminated_array);
+    {Token, DF2, Rest} ->
+        case Token of
+        "{" ->
+            EF2 = EF(object_start),
+            {DF3, EF3, Rest2} = parse_object(DF2, EF2, Rest),
+            parse_array0(DF3, EF3(object_end), Rest2);
+        "[" ->
+            EF2 = EF(array_start),
+            {DF3, EF3, Rest2} = parse_array(DF2, EF2, Rest),
+            parse_array0(DF3, EF3(array_end), Rest2);
+        Int when is_integer(Int)->
+            parse_array0(DF2, EF(Int), Rest);
+        Float when is_float(Float)->
+            parse_array0(DF2, EF(Float), Rest);
+        Atom when is_atom(Atom)->
+            parse_array0(DF2, EF(Atom), Rest);
+        String when is_binary(String)->
+            parse_array0(DF2, EF(String), Rest);
+        "]" ->
+            {DF2, EF, Rest};
+        _ ->
+            err(unexpected_token)
+        end
+    end.
+
+
+toke_string(DF, <<>>, Acc) ->
+    {Data, DF2} = must_df(DF, unterminated_string),
+    toke_string(DF2, Data, Acc);
+toke_string(DF, <<$\\,$",Rest/binary>>, Acc) ->
+    toke_string(DF, Rest, [$" | Acc]);
+toke_string(DF, <<$\\,$\\,Rest/binary>>, Acc) ->
+    toke_string(DF, Rest, [$\\ | Acc]);
+toke_string(DF, <<$\\,$/,Rest/binary>>, Acc) ->
+    toke_string(DF, Rest, [$/ | Acc]);
+toke_string(DF, <<$\\,$b,Rest/binary>>, Acc) ->
+    toke_string(DF, Rest, [$\b | Acc]);
+toke_string(DF, <<$\\,$f,Rest/binary>>, Acc) ->
+    toke_string(DF, Rest, [$\f | Acc]);
+toke_string(DF, <<$\\,$n,Rest/binary>>, Acc) ->
+    toke_string(DF, Rest, [$\n | Acc]);
+toke_string(DF, <<$\\,$r,Rest/binary>>, Acc) ->
+    toke_string(DF, Rest, [$\r | Acc]);
+toke_string(DF, <<$\\,$t,Rest/binary>>, Acc) ->
+    toke_string(DF, Rest, [$\t | Acc]);
+toke_string(DF, <<$\\,$u,Rest/binary>>, Acc) ->
+    {<<A,B,C,D,Data/binary>>, DF2} = must_df(DF,4,Rest,missing_hex),
+    UTFChar = erlang:list_to_integer([A, B, C, D], 16),
+    if UTFChar == 16#FFFF orelse UTFChar == 16#FFFE ->
+        err(invalid_utf_char);
+    true ->
+        ok
+    end,
+    Chars = xmerl_ucs:to_utf8(UTFChar),
+    toke_string(DF2, Data, lists:reverse(Chars) ++ Acc);
+toke_string(DF, <<$\\>>, Acc) ->
+    {Data, DF2} = must_df(DF, unterminated_string),
+    toke_string(DF2, <<$\\,Data/binary>>, Acc);
+toke_string(_DF, <<$\\, _/binary>>, _Acc) ->
+    err(bad_escape);
+toke_string(DF, <<$", Rest/binary>>, Acc) ->
+    {list_to_binary(lists:reverse(Acc)), DF, Rest};
+toke_string(DF, <<C, Rest/binary>>, Acc) ->
+    toke_string(DF, Rest, [C | Acc]).
+
+
+toke_number_leading(DF, <<Digit,Rest/binary>>, Acc)
+        when ?IS_DIGIT(Digit) ->
+    toke_number_leading(DF, Rest, [Digit | Acc]);
+toke_number_leading(DF, <<C,_/binary>>=Rest, Acc)
+        when ?IS_WS(C) orelse ?IS_DELIM(C) ->
+    {list_to_integer(lists:reverse(Acc)), DF, Rest};
+toke_number_leading(DF, <<>>, Acc) ->
+    case DF() of
+    done ->
+         {list_to_integer(lists:reverse(Acc)), fun() -> done end, <<>>};
+    {Data, DF2} ->
+        toke_number_leading(DF2, Data, Acc)
+    end;
+toke_number_leading(DF, <<$., Rest/binary>>, Acc) ->
+    toke_number_trailing(DF, Rest, [$.|Acc]);
+toke_number_leading(DF, <<$e, Rest/binary>>, Acc) ->
+    toke_number_exponent(DF, Rest, [$e, $0, $.|Acc]);
+toke_number_leading(DF, <<$E, Rest/binary>>, Acc) ->
+    toke_number_exponent(DF, Rest, [$e, $0, $.|Acc]);
+toke_number_leading(_, _, _) ->
+    err(unexpected_character_in_number).
+
+toke_number_trailing(DF, <<Digit,Rest/binary>>, Acc)
+        when ?IS_DIGIT(Digit) ->
+    toke_number_trailing(DF, Rest, [Digit | Acc]);
+toke_number_trailing(DF, <<C,_/binary>>=Rest, Acc)
+        when ?IS_WS(C) orelse ?IS_DELIM(C) ->
+    {list_to_float(lists:reverse(Acc)), DF, Rest};
+toke_number_trailing(DF, <<>>, Acc) ->
+    case DF() of
+    done ->
+        {list_to_float(lists:reverse(Acc)), fun() -> done end, <<>>};
+    {Data, DF2} ->
+        toke_number_trailing(DF2, Data, Acc)
+    end;
+toke_number_trailing(DF, <<"e", Rest/binary>>, [C|_]=Acc) when C /= $. ->
+    toke_number_exponent(DF, Rest, [$e|Acc]);
+toke_number_trailing(DF, <<"E", Rest/binary>>, [C|_]=Acc) when C /= $. ->
+    toke_number_exponent(DF, Rest, [$e|Acc]);
+toke_number_trailing(_, _, _) ->
+    err(unexpected_character_in_number).
+
+
+toke_number_exponent(DF, <<Digit,Rest/binary>>, Acc) when ?IS_DIGIT(Digit) ->
+    toke_number_exponent(DF, Rest, [Digit | Acc]);
+toke_number_exponent(DF, <<Sign,Rest/binary>>, [$e|_]=Acc)
+        when Sign == $+ orelse Sign == $- ->
+    toke_number_exponent(DF, Rest, [Sign | Acc]);
+toke_number_exponent(DF, <<C,_/binary>>=Rest, Acc)
+        when ?IS_WS(C) orelse ?IS_DELIM(C) ->
+    {list_to_float(lists:reverse(Acc)), DF, Rest};
+toke_number_exponent(DF, <<>>, Acc) ->
+    case DF() of
+    done ->
+        {list_to_float(lists:reverse(Acc)), fun() -> done end, <<>>};
+    {Data, DF2} ->
+        toke_number_exponent(DF2, Data, Acc)
+    end;
+toke_number_exponent(_, _, _) ->
+        err(unexpected_character_in_number).
+
+
+err(Error)->
+    throw({parse_error,Error}).
+
+
+make_ejson([], Stack) ->
+    Stack;
+make_ejson([array_start | RevEvs], [ArrayValues, PrevValues | RestStack]) ->
+    make_ejson(RevEvs, [[ArrayValues | PrevValues] | RestStack]);
+make_ejson([array_end | RevEvs], Stack) ->
+    make_ejson(RevEvs, [[] | Stack]);
+make_ejson([object_start | RevEvs], [ObjValues, PrevValues | RestStack]) ->
+    make_ejson(RevEvs, [[{ObjValues} | PrevValues] | RestStack]);
+make_ejson([object_end | RevEvs], Stack) ->
+    make_ejson(RevEvs, [[] | Stack]);
+make_ejson([{key, String} | RevEvs], [[PrevValue|RestObject] | RestStack] = _Stack) ->
+    make_ejson(RevEvs, [[{String, PrevValue}|RestObject] | RestStack]);
+make_ejson([Value | RevEvs], [Vals | RestStack] = _Stack) ->
+    make_ejson(RevEvs, [[Value | Vals] | RestStack]).
+
+collect_events(get_results, Acc) ->
+    Acc;
+collect_events(Ev, Acc) ->
+    fun(NextEv) -> collect_events(NextEv, [Ev | Acc]) end.
+
+
+collect_object(object_end, 0, ReturnControl, Acc) ->
+    [[Obj]] = make_ejson([object_end | Acc], [[]]),
+    ReturnControl(Obj);
+collect_object(object_end, NestCount, ReturnControl, Acc) ->
+    fun(Ev) ->
+        collect_object(Ev, NestCount - 1, ReturnControl, [object_end | Acc])
+    end;
+collect_object(object_start, NestCount, ReturnControl, Acc) ->
+    fun(Ev) ->
+        collect_object(Ev, NestCount + 1, ReturnControl, [object_start | Acc])
+    end;
+collect_object(Ev, NestCount, ReturnControl, Acc) ->
+    fun(Ev2) ->
+        collect_object(Ev2, NestCount, ReturnControl, [Ev | Acc])
+    end.

http://git-wip-us.apache.org/repos/asf/couchdb/blob/add91738/apps/couch_collate/c_src/couch_collate.c
----------------------------------------------------------------------
diff --git a/apps/couch_collate/c_src/couch_collate.c b/apps/couch_collate/c_src/couch_collate.c
new file mode 100644
index 0000000..c5453d2
--- /dev/null
+++ b/apps/couch_collate/c_src/couch_collate.c
@@ -0,0 +1,281 @@
+/*
+
+Licensed under the Apache License, Version 2.0 (the "License"); you may not use
+this file except in compliance with the License. You may obtain a copy of the
+License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software distributed
+under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
+CONDITIONS OF ANY KIND, either express or implied. See the License for the
+specific language governing permissions and limitations under the License.
+
+*/
+
+#ifdef DARWIN
+#define U_HIDE_DRAFT_API 1
+#define U_DISABLE_RENAMING 1
+#endif
+
+#include "erl_nif.h"
+#include "unicode/ucol.h"
+#include "unicode/ucasemap.h"
+#include <stdio.h>
+#include <assert.h>
+
+static ERL_NIF_TERM ATOM_TRUE;
+static ERL_NIF_TERM ATOM_FALSE;
+static ERL_NIF_TERM ATOM_NULL;
+
+typedef struct {
+    ErlNifEnv* env;
+    int error;
+    UCollator* coll;
+} ctx_t;
+
+typedef struct {
+    UCollator** collators;
+    int collStackTop;
+    int numCollators;
+    ErlNifMutex* collMutex;
+} priv_data_t;
+
+static ERL_NIF_TERM collate_nif(ErlNifEnv*, int, const ERL_NIF_TERM []);
+static int collate_binary(priv_data_t*, ctx_t*, ERL_NIF_TERM, ERL_NIF_TERM, ERL_NIF_TERM);
+static int on_load(ErlNifEnv*, void**, ERL_NIF_TERM);
+static void on_unload(ErlNifEnv*, void*);
+static __inline void reserve_coll(priv_data_t*, ctx_t*);
+static __inline void release_coll(priv_data_t*, ctx_t*);
+int on_reload(ErlNifEnv*, void**, ERL_NIF_TERM);
+int on_upgrade(ErlNifEnv*, void**, void**, ERL_NIF_TERM);
+
+void
+reserve_coll(priv_data_t* pData, ctx_t *ctx)
+{
+    if (ctx->coll == NULL) {
+        enif_mutex_lock(pData->collMutex);
+        assert(pData->collStackTop < pData->numCollators);
+        ctx->coll = pData->collators[pData->collStackTop];
+        pData->collStackTop += 1;
+        enif_mutex_unlock(pData->collMutex);
+    }
+}
+
+
+void
+release_coll(priv_data_t* pData, ctx_t *ctx)
+{
+    if (ctx->coll != NULL) {
+        enif_mutex_lock(pData->collMutex);
+        pData->collStackTop -= 1;
+        assert(pData->collStackTop >= 0);
+        enif_mutex_unlock(pData->collMutex);
+    }
+}
+
+/* ------------------------------------------------------------------------- */
+
+static ERL_NIF_TERM
+collate_nif(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]) {
+    ERL_NIF_TERM term_a          = argv[0];
+    ERL_NIF_TERM term_b          = argv[1];
+    ERL_NIF_TERM term_has_nocase = argv[2];
+
+    ctx_t ctx;
+    int result;
+    priv_data_t* pData;
+
+    ctx.env = env;
+    ctx.error = 0;
+    ctx.coll = NULL;
+
+    pData = (priv_data_t*) enif_priv_data(env);
+
+    result = collate_binary(pData, &ctx, term_a, term_b, term_has_nocase);
+    release_coll(pData, &ctx);
+
+    return enif_make_int(env, result);
+}
+
+int
+collate_binary(priv_data_t* pData, ctx_t* ctx, ERL_NIF_TERM term_a, ERL_NIF_TERM term_b, ERL_NIF_TERM term_has_nocase)
+{
+    ErlNifBinary binA, binB;
+    int has_nocase, response;
+
+    if(!enif_get_int(ctx->env, term_has_nocase, &has_nocase)) {
+        ctx->error = 1;
+        return 0;
+    }
+    if(!enif_inspect_binary(ctx->env, term_a, &binA)) {
+        ctx->error = 1;
+        return 0;
+    }
+    if(!enif_inspect_binary(ctx->env, term_b, &binB)) {
+        ctx->error = 1;
+        return 0;
+    }
+
+    switch(has_nocase) {
+    case 0: /* COLLATE */
+    case 1: /* COLLATE_NO_CASE: */
+        {
+        UErrorCode status = U_ZERO_ERROR;
+        UCharIterator iterA;
+        UCharIterator iterB;
+
+        uiter_setUTF8(&iterA, (const char *) binA.data, (uint32_t) binA.size);
+        uiter_setUTF8(&iterB, (const char *) binB.data, (uint32_t) binB.size);
+
+        /* grab a collator */
+        reserve_coll(pData, ctx);
+
+        if (has_nocase == 1) /* switching this collator to case insensitive */
+          ucol_setAttribute(ctx->coll, UCOL_STRENGTH, UCOL_PRIMARY, &status);
+
+        /* by default, it will collate case sensitive */
+        response = ucol_strcollIter(ctx->coll, &iterA, &iterB, &status);
+
+        if (has_nocase == 1) /* puting back this collator to case sensitive */
+          ucol_setAttribute(ctx->coll, UCOL_STRENGTH, UCOL_DEFAULT, &status);
+
+        break;
+        }
+
+    default:
+        response = -1;
+    }
+
+    return response;
+}
+
+/* ------------------------------------------------------------------------- */
+
+int
+on_load(ErlNifEnv* env, void** priv_data, ERL_NIF_TERM info)
+{
+    UErrorCode status = U_ZERO_ERROR;
+    priv_data_t* pData = (priv_data_t*)enif_alloc(sizeof(priv_data_t));
+    int i, j;
+
+    /* Initialize the structure */
+    pData->collators = NULL;
+    pData->collStackTop = 0;
+    pData->numCollators = 0;
+    pData->collMutex = NULL;
+
+    if (!enif_get_int(env, info, &(pData->numCollators) )) {
+        enif_free((char*)pData);
+        return 1;
+    }
+
+    if (pData->numCollators < 1) {
+        enif_free((char*)pData);
+        return 2;
+    }
+
+    pData->collMutex = enif_mutex_create((char *)"coll_mutex");
+
+    if (pData->collMutex == NULL) {
+        enif_free((char*)pData);
+        return 3;
+    }
+
+    pData->collators = enif_alloc(sizeof(UCollator*) * pData->numCollators);
+
+    if (pData->collators == NULL) {
+        enif_mutex_destroy(pData->collMutex);
+        enif_free((char*)pData);
+        return 4;
+    }
+
+    for (i = 0; i < pData->numCollators; i++) {
+        pData->collators[i] = ucol_open("", &status);
+
+        if (U_FAILURE(status)) {
+            for (j = 0; j < i; j++) {
+                ucol_close(pData->collators[j]);
+            }
+
+            enif_free(pData->collators);
+            enif_mutex_destroy(pData->collMutex);
+
+            enif_free((char*)pData);
+
+            return 5;
+        }
+    }
+
+    ATOM_TRUE = enif_make_atom(env, "true");
+    ATOM_FALSE = enif_make_atom(env, "false");
+    ATOM_NULL = enif_make_atom(env, "null");
+
+    *priv_data = pData;
+
+    return 0;
+}
+
+
+void
+on_unload(ErlNifEnv* env, void* priv_data)
+{
+    priv_data_t* pData = (priv_data_t*)priv_data;
+    if (pData->collators != NULL) {
+        int i;
+
+        for (i = 0; i < pData->numCollators; i++) {
+            ucol_close(pData->collators[i]);
+        }
+
+        enif_free(pData->collators);
+    }
+
+    if (pData->collMutex != NULL) {
+        enif_mutex_destroy(pData->collMutex);
+    }
+
+    enif_free((char*)pData);
+}
+
+int
+on_reload(ErlNifEnv* env, void** priv_data, ERL_NIF_TERM info)
+{
+    return 0;
+}
+
+int
+on_upgrade(ErlNifEnv* env, void** priv_data, void** old_data, ERL_NIF_TERM info)
+{
+    if (*old_data != NULL) {
+        priv_data_t* pData = (priv_data_t*)old_data;
+
+        if (pData->collators != NULL) {
+            int i;
+
+            for (i = 0; i < pData->numCollators; i++) {
+                ucol_close(pData->collators[i]);
+            }
+
+            enif_free(pData->collators);
+        }
+
+        if (pData->collMutex != NULL) {
+            enif_mutex_destroy(pData->collMutex);
+        }
+
+        enif_free((char*)pData);
+    }
+
+    return on_load(env, priv_data, info);
+}
+
+/* ------------------------------------------------------------------------- */
+
+static ErlNifFunc
+nif_funcs[] =
+{
+    {"collate_nif", 3, collate_nif}
+};
+
+ERL_NIF_INIT(couch_collate, nif_funcs, &on_load, &on_reload, &on_upgrade, &on_unload)

http://git-wip-us.apache.org/repos/asf/couchdb/blob/add91738/apps/couch_collate/rebar.config
----------------------------------------------------------------------
diff --git a/apps/couch_collate/rebar.config b/apps/couch_collate/rebar.config
new file mode 100644
index 0000000..7bfd49e
--- /dev/null
+++ b/apps/couch_collate/rebar.config
@@ -0,0 +1,9 @@
+%% -*- tab-width: 4;erlang-indent-level: 4;indent-tabs-mode: nil -*-
+%% ex: ft=erlang ts=4 sw=4 et
+
+{erl_opts, [
+        warnings_as_errors,
+        warn_export_all
+]}.
+
+{pre_hooks, [{clean, "rm -fr ebin priv erl_crash.dump"}]}.

http://git-wip-us.apache.org/repos/asf/couchdb/blob/add91738/apps/couch_collate/rebar.config.script
----------------------------------------------------------------------
diff --git a/apps/couch_collate/rebar.config.script b/apps/couch_collate/rebar.config.script
new file mode 100644
index 0000000..ff5ef9b
--- /dev/null
+++ b/apps/couch_collate/rebar.config.script
@@ -0,0 +1,46 @@
+%% -*- tab-width: 4;erlang-indent-level: 4;indent-tabs-mode: nil -*-
+%% ex: ft=erlang ts=4 sw=4 et
+
+%% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+%% use this file except in compliance with the License. You may obtain a copy of
+%% the License at
+%%
+%%   http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+%% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+%% License for the specific language governing permissions and limitations under
+%% the License.
+
+Arch = erlang:system_info(system_architecture),
+
+ICUConfig = fun(Args) ->
+    {0, Value} = eunit_lib:command("icu-config " ++ Args),
+    [C||C <- Value, C =/= $\n]
+end,
+
+GetFlag = fun(Name, Args) ->
+        case os:getenv(Name) of
+            false -> ICUConfig(Args);
+            Val -> Val
+        end
+    end,
+
+ICUCFLAGS = GetFlag("ICU_CFLAGS", "--cflags"),
+ICUCXXFLAGS = GetFlag("ICU_CXXFLAGS", "--cxxflags"),
+ICULDFLAGS = GetFlag("ICU_LDFLAGS", "--ldflags"),
+ICUINCPATH = GetFlag("ICU_INCPATH", "--cppflags-searchpath"),
+
+
+PortEnv = [{port_env, [
+            {"CFLAGS",  ICUCFLAGS ++  " $CFLAGS " ++ ICUINCPATH},
+            {"CXXFLAGS", ICUCXXFLAGS ++ " $CXXFLAGS " ++ ICUINCPATH},
+            {"LDFLAGS",  ICULDFLAGS ++ " $LDFLAGS"}]},
+
+           {port_specs, [
+            {filename:join(["priv", Arch, "couch_collate.so"]),
+            ["c_src/*.c"]}]}
+],
+
+lists:keymerge(1,lists:keysort(1, PortEnv), lists:keysort(1, CONFIG)).

http://git-wip-us.apache.org/repos/asf/couchdb/blob/add91738/apps/couch_collate/src/couch_collate.app.src
----------------------------------------------------------------------
diff --git a/apps/couch_collate/src/couch_collate.app.src b/apps/couch_collate/src/couch_collate.app.src
new file mode 100644
index 0000000..4541b67
--- /dev/null
+++ b/apps/couch_collate/src/couch_collate.app.src
@@ -0,0 +1,13 @@
+%% -*- tab-width: 4;erlang-indent-level: 4;indent-tabs-mode: nil -*-
+%% ex: ft=erlang ts=4 sw=4 et
+
+{application, couch_collate,
+    [
+        {description, "couchdb collation module"},
+        {vsn, "0.1.0"},
+        {registered, []},
+        {applications, [kernel,
+                        stdlib]},
+        {included_applications, []},
+        {env, []}
+]}.

http://git-wip-us.apache.org/repos/asf/couchdb/blob/add91738/apps/couch_collate/src/couch_collate.erl
----------------------------------------------------------------------
diff --git a/apps/couch_collate/src/couch_collate.erl b/apps/couch_collate/src/couch_collate.erl
new file mode 100644
index 0000000..341510f
--- /dev/null
+++ b/apps/couch_collate/src/couch_collate.erl
@@ -0,0 +1,62 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_collate).
+
+-export([init/0]).
+-export([collate/2, collate/3]).
+
+-on_load(init/0).
+
+-type collate_options() :: [nocase].
+-export_type([collate_options/0]).
+
+init() ->
+    PrivDir = case code:priv_dir(?MODULE) of
+        {error, _} ->
+            EbinDir = filename:dirname(code:which(?MODULE)),
+            AppPath = filename:dirname(EbinDir),
+            filename:join(AppPath, "priv");
+        Path ->
+            Path
+    end,
+    NumScheds = erlang:system_info(schedulers),
+    Arch = erlang:system_info(system_architecture),
+    (catch erlang:load_nif(filename:join([PrivDir, Arch, ?MODULE]),
+                           NumScheds)),
+    case erlang:system_info(otp_release) of
+        "R13B03" -> true;
+        _ -> ok
+    end.
+
+%% @doc compare 2 string, result is -1 for lt, 0 for eq and 1 for gt.
+-spec collate(binary(), binary()) -> 0 | -1 | 1.
+collate(A, B) ->
+    collate(A, B, []).
+
+-spec collate(binary(), binary(), collate_options()) -> 0 | -1 | 1.
+collate(A, B, Options) when is_binary(A), is_binary(B) ->
+    HasNoCase = case lists:member(nocase, Options) of
+        true -> 1; % Case insensitive
+        false -> 0 % Case sensitive
+    end,
+    do_collate(A, B, HasNoCase).
+
+%% @private
+
+do_collate(BinaryA, BinaryB, 0) ->
+    collate_nif(BinaryA, BinaryB, 0);
+do_collate(BinaryA, BinaryB, 1) ->
+    collate_nif(BinaryA, BinaryB, 1).
+
+collate_nif(_BinaryA, _BinaryB, _HasCase) ->
+    exit(couch_collate_not_loaded).

http://git-wip-us.apache.org/repos/asf/couchdb/blob/add91738/apps/couch_dbupdates/Makefile.am
----------------------------------------------------------------------
diff --git a/apps/couch_dbupdates/Makefile.am b/apps/couch_dbupdates/Makefile.am
new file mode 100644
index 0000000..d131b9b
--- /dev/null
+++ b/apps/couch_dbupdates/Makefile.am
@@ -0,0 +1,33 @@
+## Licensed under the Apache License, Version 2.0 (the "License"); you may not
+## use this file except in compliance with the License. You may obtain a copy of
+## the License at
+##
+##   http://www.apache.org/licenses/LICENSE-2.0
+##
+## Unless required by applicable law or agreed to in writing, software
+## distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+## WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+## License for the specific language governing permissions and limitations under
+## the License.
+
+couch_dbupdateslibdir = $(localerlanglibdir)/couch_dbupdates-0.1
+couch_dbupdatesebindir = $(couch_dbupdateslibdir)/ebin
+
+couch_dbupdatesebin_DATA = $(compiled_files)
+
+EXTRA_DIST = $(source_files)
+CLEANFILES = $(compiled_files)
+
+source_files = \
+    src/couch_dbupdates.erl \
+    src/couch_dbupdates.app.src \
+    src/couch_dbupdates_httpd.erl
+
+compiled_files = \
+    ebin/couch_dbupdates.beam \
+    ebin/couch_dbupdates_httpd.beam
+
+ebin/%.beam: src/%.erl
+	@mkdir -p ebin/
+	$(ERLC) -I$(top_srcdir)/src/couchdb -o ebin/ $(ERLC_FLAGS) ${TEST} $<;
+

http://git-wip-us.apache.org/repos/asf/couchdb/blob/add91738/apps/couch_dbupdates/src/couch_dbupdates.app.src
----------------------------------------------------------------------
diff --git a/apps/couch_dbupdates/src/couch_dbupdates.app.src b/apps/couch_dbupdates/src/couch_dbupdates.app.src
new file mode 100644
index 0000000..c420283
--- /dev/null
+++ b/apps/couch_dbupdates/src/couch_dbupdates.app.src
@@ -0,0 +1,11 @@
+{application, couch_dbupdates,
+ [
+  {description, ""},
+  {vsn, "@version@"},
+  {registered, []},
+  {applications, [
+                  kernel,
+                  stdlib
+                 ]},
+  {env, []}
+ ]}.

http://git-wip-us.apache.org/repos/asf/couchdb/blob/add91738/apps/couch_dbupdates/src/couch_dbupdates.erl
----------------------------------------------------------------------
diff --git a/apps/couch_dbupdates/src/couch_dbupdates.erl b/apps/couch_dbupdates/src/couch_dbupdates.erl
new file mode 100644
index 0000000..e37362f
--- /dev/null
+++ b/apps/couch_dbupdates/src/couch_dbupdates.erl
@@ -0,0 +1,46 @@
+-module(couch_dbupdates).
+
+-export([handle_dbupdates/3]).
+
+
+handle_dbupdates(Fun, Acc, Options) ->
+    NotifierPid = db_update_notifier(),
+    try
+        loop(Fun, Acc, Options)
+    after
+        couch_db_update_notifier:stop(NotifierPid)
+    end.
+
+
+loop(Fun, Acc, Options) ->
+    [{timeout, Timeout}, {heartbeat, Heartbeat}] = Options,
+    receive
+        {db_updated, Event} ->
+            case Fun(Event, Acc) of
+                {ok, Acc1} ->
+                    loop(Fun, Acc1, Options);
+                stop ->
+                    Fun(stop, Acc)
+
+            end
+    after Timeout ->
+        case Heartbeat of
+            true ->
+                case Fun(heartbeat, Acc) of
+                {ok, Acc1} ->
+                    loop(Fun, Acc1, Options);
+                stop ->
+                    Fun(stop, Acc)
+
+                end;
+            _ ->
+                Fun(stop, Acc)
+        end
+    end.
+
+db_update_notifier() ->
+    Self = self(),
+    {ok, Notifier} = couch_db_update_notifier:start_link(fun(Event) ->
+        Self ! {db_updated, Event}
+    end),
+    Notifier.

http://git-wip-us.apache.org/repos/asf/couchdb/blob/add91738/apps/couch_dbupdates/src/couch_dbupdates_httpd.erl
----------------------------------------------------------------------
diff --git a/apps/couch_dbupdates/src/couch_dbupdates_httpd.erl b/apps/couch_dbupdates/src/couch_dbupdates_httpd.erl
new file mode 100644
index 0000000..ec0c4d6
--- /dev/null
+++ b/apps/couch_dbupdates/src/couch_dbupdates_httpd.erl
@@ -0,0 +1,69 @@
+-module(couch_dbupdates_httpd).
+
+-export([handle_req/1]).
+
+-include_lib("couch/include/couch_db.hrl").
+
+-record(state, {resp, feed}).
+
+handle_req(#httpd{method='GET'}=Req) ->
+    ok = couch_httpd:verify_is_server_admin(Req),
+    Qs = couch_httpd:qs(Req),
+    Feed = proplists:get_value("feed", Qs, "longpoll"),
+
+    Timeout = list_to_integer(
+                proplists:get_value("timeout", Qs, "60000")
+    ),
+
+    Heartbeat0 = proplists:get_value("heartbeat", Qs),
+    Heartbeat = case {Feed, Heartbeat0} of
+        {"longpoll", _} -> false;
+        {_, "false"} -> false;
+        _ -> true
+    end,
+
+    Options = [{timeout, Timeout}, {heartbeat, Heartbeat}],
+
+    {ok, Resp} = case Feed of
+        "eventsource" ->
+            Headers = [
+                {"Content-Type", "text/event-stream"},
+                {"Cache-Control", "no-cache"}
+            ],
+            couch_httpd:start_json_response(Req, 200, Headers);
+        _ ->
+            couch_httpd:start_json_response(Req, 200)
+    end,
+
+    State = #state{resp=Resp, feed=Feed},
+    couch_dbupdates:handle_dbupdates(fun handle_update/2,
+                                     State, Options).
+
+handle_req(Req, _Db) ->
+    couch_httpd:send_method_not_allowed(Req, "GET").
+
+handle_update(stop, #state{resp=Resp}) ->
+    couch_httpd:end_json_response(Resp);
+handle_update(heartbeat, #state{resp=Resp}=State) ->
+    {ok, Resp1} = couch_httpd:send_chunk(Resp, "\n"),
+    {ok, State#state{resp=Resp1}};
+handle_update(Event, #state{resp=Resp, feed="eventsource"}=State) ->
+    EventObj = event_obj(Event),
+    {ok, Resp1} = couch_httpd:send_chunk(Resp, ["data: ",
+                                                ?JSON_ENCODE(EventObj),
+                                                "\n\n"]),
+    {ok, State#state{resp=Resp1}};
+handle_update(Event, #state{resp=Resp, feed="continuous"}=State) ->
+    EventObj = event_obj(Event),
+    {ok, Resp1} = couch_httpd:send_chunk(Resp, [?JSON_ENCODE(EventObj) |
+                            "\n"]),
+    {ok, State#state{resp=Resp1}};
+handle_update(Event, #state{resp=Resp, feed="longpoll"}) ->
+    {Props} = event_obj(Event),
+    JsonObj = {[{<<"ok">>, true} | Props]},
+    couch_httpd:send_chunk(Resp, ?JSON_ENCODE(JsonObj)),
+    stop.
+
+event_obj({Type, DbName}) ->
+    {[{<<"type">>, couch_util:to_binary(Type)},
+      {<<"db_name">>, couch_util:to_binary(DbName)}]}.

http://git-wip-us.apache.org/repos/asf/couchdb/blob/add91738/apps/couch_index/Makefile.am
----------------------------------------------------------------------
diff --git a/apps/couch_index/Makefile.am b/apps/couch_index/Makefile.am
new file mode 100644
index 0000000..2d4c593
--- /dev/null
+++ b/apps/couch_index/Makefile.am
@@ -0,0 +1,40 @@
+## Licensed under the Apache License, Version 2.0 (the "License"); you may not
+## use this file except in compliance with the License. You may obtain a copy of
+## the License at
+##
+##   http://www.apache.org/licenses/LICENSE-2.0
+##
+## Unless required by applicable law or agreed to in writing, software
+## distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+## WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+## License for the specific language governing permissions and limitations under
+## the License.
+
+couch_indexlibdir = $(localerlanglibdir)/couch_index-0.1
+couch_indexebindir = $(couch_indexlibdir)/ebin
+
+couch_indexebin_DATA = $(compiled_files)
+
+EXTRA_DIST = $(source_files)
+CLEANFILES = $(compiled_files)
+
+source_files = \
+    src/couch_index.erl \
+    src/couch_index_api.erl \
+    src/couch_index.app.src \
+    src/couch_index_compactor.erl \
+    src/couch_index_server.erl \
+    src/couch_index_updater.erl \
+    src/couch_index_util.erl
+
+compiled_files = \
+    ebin/couch_index.beam \
+    ebin/couch_index_compactor.beam \
+    ebin/couch_index_server.beam \
+    ebin/couch_index_updater.beam \
+    ebin/couch_index_util.beam
+
+ebin/%.beam: src/%.erl
+	@mkdir -p ebin/
+	$(ERLC) -I$(top_srcdir)/src/couchdb -o ebin/ $(ERLC_FLAGS) ${TEST} $<;
+

http://git-wip-us.apache.org/repos/asf/couchdb/blob/add91738/apps/couch_index/src/couch_index.app.src
----------------------------------------------------------------------
diff --git a/apps/couch_index/src/couch_index.app.src b/apps/couch_index/src/couch_index.app.src
new file mode 100644
index 0000000..141ed9d
--- /dev/null
+++ b/apps/couch_index/src/couch_index.app.src
@@ -0,0 +1,22 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+{application, couch_index, [
+    {description, "CouchDB Secondary Index Manager"},
+    {vsn, "@version@"},
+    {modules, [
+        couch_index,
+        couch_index_server
+    ]},
+    {registered, [couch_index_server]},
+    {applications, [kernel, stdlib]}
+]}.

http://git-wip-us.apache.org/repos/asf/couchdb/blob/add91738/apps/couch_index/src/couch_index.erl
----------------------------------------------------------------------
diff --git a/apps/couch_index/src/couch_index.erl b/apps/couch_index/src/couch_index.erl
new file mode 100644
index 0000000..c09a110
--- /dev/null
+++ b/apps/couch_index/src/couch_index.erl
@@ -0,0 +1,338 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_index).
+-behaviour(gen_server).
+
+
+%% API
+-export([start_link/1, stop/1, get_state/2, get_info/1]).
+-export([compact/1, compact/2, get_compactor_pid/1]).
+-export([config_change/3]).
+
+%% gen_server callbacks
+-export([init/1, terminate/2, code_change/3]).
+-export([handle_call/3, handle_cast/2, handle_info/2]).
+
+-include_lib("couch/include/couch_db.hrl").
+
+-record(st, {
+    mod,
+    idx_state,
+    updater,
+    compactor,
+    waiters=[],
+    commit_delay,
+    committed=true,
+    shutdown=false
+}).
+
+
+start_link({Module, IdxState}) ->
+    proc_lib:start_link(?MODULE, init, [{Module, IdxState}]).
+
+
+stop(Pid) ->
+    gen_server:cast(Pid, stop).
+
+
+get_state(Pid, RequestSeq) ->
+    gen_server:call(Pid, {get_state, RequestSeq}, infinity).
+
+
+get_info(Pid) ->
+    gen_server:call(Pid, get_info).
+
+
+compact(Pid) ->
+    compact(Pid, []).
+
+
+compact(Pid, Options) ->
+    {ok, CPid} = gen_server:call(Pid, compact),
+    case lists:member(monitor, Options) of
+        true -> {ok, erlang:monitor(process, CPid)};
+        false -> ok
+    end.
+
+
+get_compactor_pid(Pid) ->
+    gen_server:call(Pid, get_compactor_pid).
+
+config_change("query_server_config", "commit_freq", NewValue) ->
+    ok = gen_server:cast(?MODULE, {config_update, NewValue}).
+
+
+init({Mod, IdxState}) ->
+    ok = couch_config:register(fun ?MODULE:config_change/3),
+    DbName = Mod:get(db_name, IdxState),
+    Resp = couch_util:with_db(DbName, fun(Db) ->
+        case Mod:open(Db, IdxState) of
+            {ok, IdxSt} ->
+                couch_db:monitor(Db),
+                {ok, IdxSt};
+            Error ->
+                Error
+        end
+    end),
+    case Resp of
+        {ok, NewIdxState} ->
+            {ok, UPid} = couch_index_updater:start_link(self(), Mod),
+            {ok, CPid} = couch_index_compactor:start_link(self(), Mod),
+            Delay = couch_config:get("query_server_config", "commit_freq", "5"),
+            MsDelay = 1000 * list_to_integer(Delay),
+            State = #st{
+                mod=Mod,
+                idx_state=NewIdxState,
+                updater=UPid,
+                compactor=CPid,
+                commit_delay=MsDelay
+            },
+            Args = [
+                Mod:get(db_name, IdxState),
+                Mod:get(idx_name, IdxState),
+                couch_index_util:hexsig(Mod:get(signature, IdxState))
+            ],
+            ?LOG_INFO("Opening index for db: ~s idx: ~s sig: ~p", Args),
+            proc_lib:init_ack({ok, self()}),
+            gen_server:enter_loop(?MODULE, [], State);
+        Other ->
+            proc_lib:init_ack(Other)
+    end.
+
+
+terminate(Reason, State) ->
+    #st{mod=Mod, idx_state=IdxState}=State,
+    Mod:close(IdxState),
+    send_all(State#st.waiters, Reason),
+    couch_util:shutdown_sync(State#st.updater),
+    couch_util:shutdown_sync(State#st.compactor),
+    Args = [
+        Mod:get(db_name, IdxState),
+        Mod:get(idx_name, IdxState),
+        couch_index_util:hexsig(Mod:get(signature, IdxState)),
+        Reason
+    ],
+    ?LOG_INFO("Closing index for db: ~s idx: ~s sig: ~p~nreason: ~p", Args),
+    ok.
+
+
+handle_call({get_state, ReqSeq}, From, State) ->
+    #st{
+        mod=Mod,
+        idx_state=IdxState,
+        waiters=Waiters
+    } = State,
+    IdxSeq = Mod:get(update_seq, IdxState),
+    case ReqSeq =< IdxSeq of
+        true ->
+            {reply, {ok, IdxState}, State};
+        _ -> % View update required
+            couch_index_updater:run(State#st.updater, IdxState),
+            Waiters2 = [{From, ReqSeq} | Waiters],
+            {noreply, State#st{waiters=Waiters2}, infinity}
+    end;
+handle_call(get_info, _From, State) ->
+    #st{mod=Mod} = State,
+    {ok, Info0} = Mod:get(info, State#st.idx_state),
+    IsUpdating = couch_index_updater:is_running(State#st.updater),
+    IsCompacting = couch_index_compactor:is_running(State#st.compactor),
+    Info = Info0 ++ [
+        {updater_running, IsUpdating},
+        {compact_running, IsCompacting},
+        {waiting_commit, State#st.committed == false},
+        {waiting_clients, length(State#st.waiters)}
+    ],
+    {reply, {ok, Info}, State};
+handle_call(reset, _From, State) ->
+    #st{
+        mod=Mod,
+        idx_state=IdxState
+    } = State,
+    {ok, NewIdxState} = Mod:reset(IdxState),
+    {reply, {ok, NewIdxState}, State#st{idx_state=NewIdxState}};
+handle_call(compact, _From, State) ->
+    Resp = couch_index_compactor:run(State#st.compactor, State#st.idx_state),
+    {reply, Resp, State};
+handle_call(get_compactor_pid, _From, State) ->
+    {reply, {ok, State#st.compactor}, State};
+handle_call({compacted, NewIdxState}, _From, State) ->
+    #st{
+        mod=Mod,
+        idx_state=OldIdxState,
+        updater=Updater,
+        commit_delay=Delay
+    } = State,
+    assert_signature_match(Mod, OldIdxState, NewIdxState),
+    NewSeq = Mod:get(update_seq, NewIdxState),
+    OldSeq = Mod:get(update_seq, OldIdxState),
+    % For indices that require swapping files, we have to make sure we're
+    % up to date with the current index. Otherwise indexes could roll back
+    % (perhaps considerably) to previous points in history.
+    case NewSeq >= OldSeq of
+        true ->
+            {ok, NewIdxState1} = Mod:swap_compacted(OldIdxState, NewIdxState),
+            % Restart the indexer if it's running.
+            case couch_index_updater:is_running(Updater) of
+                true -> ok = couch_index_updater:restart(Updater, NewIdxState1);
+                false -> ok
+            end,
+            case State#st.committed of
+                true -> erlang:send_after(Delay, self(), commit);
+                false -> ok
+            end,
+            {reply, ok, State#st{
+                idx_state=NewIdxState1,
+                committed=false
+            }};
+        _ ->
+            {reply, recompact, State}
+    end.
+
+
+handle_cast({config_change, NewDelay}, State) ->
+    MsDelay = 1000 * list_to_integer(NewDelay),
+    {noreply, State#st{commit_delay=MsDelay}};
+handle_cast({updated, NewIdxState}, State) ->
+    {noreply, NewState} = handle_cast({new_state, NewIdxState}, State),
+    case NewState#st.shutdown andalso (NewState#st.waiters =:= []) of
+        true ->
+            {stop, normal, NewState};
+        false ->
+            maybe_restart_updater(NewState),
+            {noreply, NewState}
+    end;
+handle_cast({new_state, NewIdxState}, State) ->
+    #st{
+        mod=Mod,
+        idx_state=OldIdxState,
+        commit_delay=Delay
+    } = State,
+    assert_signature_match(Mod, OldIdxState, NewIdxState),
+    CurrSeq = Mod:get(update_seq, NewIdxState),
+    Args = [
+        Mod:get(db_name, NewIdxState),
+        Mod:get(idx_name, NewIdxState),
+        CurrSeq
+    ],
+    ?LOG_DEBUG("Updated index for db: ~s idx: ~s seq: ~B", Args),
+    Rest = send_replies(State#st.waiters, CurrSeq, NewIdxState),
+    case State#st.committed of
+        true -> erlang:send_after(Delay, self(), commit);
+        false -> ok
+    end,
+    {noreply, State#st{
+        idx_state=NewIdxState,
+        waiters=Rest,
+        committed=false
+    }};
+handle_cast({update_error, Error}, State) ->
+    send_all(State#st.waiters, Error),
+    {noreply, State#st{waiters=[]}};
+handle_cast(stop, State) ->
+    {stop, normal, State};
+handle_cast(delete, State) ->
+    #st{mod=Mod, idx_state=IdxState} = State,
+    ok = Mod:delete(IdxState),
+    {stop, normal, State};
+handle_cast(ddoc_updated, State) ->
+    #st{mod = Mod, idx_state = IdxState, waiters = Waiters} = State,
+    DbName = Mod:get(db_name, IdxState),
+    DDocId = Mod:get(idx_name, IdxState),
+    Shutdown = couch_util:with_db(DbName, fun(Db) ->
+        case couch_db:open_doc(Db, DDocId, [ejson_body]) of
+            {not_found, deleted} ->
+                true;
+            {ok, DDoc} ->
+                {ok, NewIdxState} = Mod:init(Db, DDoc),
+                Mod:get(signature, NewIdxState) =/= Mod:get(signature, IdxState)
+        end
+    end),
+    case Shutdown of
+        true ->
+            case Waiters of
+                [] ->
+                    {stop, normal, State};
+                _ ->
+                    {noreply, State#st{shutdown = true}}
+            end;
+        false ->
+            {noreply, State#st{shutdown = false}}
+    end;
+handle_cast(_Mesg, State) ->
+    {stop, unhandled_cast, State}.
+
+
+handle_info(commit, #st{committed=true}=State) ->
+    {noreply, State};
+handle_info(commit, State) ->
+    #st{mod=Mod, idx_state=IdxState, commit_delay=Delay} = State,
+    DbName = Mod:get(db_name, IdxState),
+    GetCommSeq = fun(Db) -> couch_db:get_committed_update_seq(Db) end,
+    CommittedSeq = couch_util:with_db(DbName, GetCommSeq),
+    case CommittedSeq >= Mod:get(update_seq, IdxState) of
+        true ->
+            % Commit the updates
+            ok = Mod:commit(IdxState),
+            {noreply, State#st{committed=true}};
+        _ ->
+            % We can't commit the header because the database seq that's
+            % fully committed to disk is still behind us. If we committed
+            % now and the database lost those changes our view could be
+            % forever out of sync with the database. But a crash before we
+            % commit these changes, no big deal, we only lose incremental
+            % changes since last committal.
+            erlang:send_after(Delay, self(), commit),
+            {noreply, State}
+    end;
+handle_info({'DOWN', _, _, _Pid, _}, #st{mod=Mod, idx_state=IdxState}=State) ->
+    Args = [Mod:get(db_name, IdxState), Mod:get(idx_name, IdxState)],
+    ?LOG_INFO("Index shutdown by monitor notice for db: ~s idx: ~s", Args),
+    catch send_all(State#st.waiters, shutdown),
+    {stop, normal, State#st{waiters=[]}}.
+
+
+code_change(_OldVsn, State, _Extra) ->
+    {ok, State}.
+
+
+maybe_restart_updater(#st{waiters=[]}) ->
+    ok;
+maybe_restart_updater(#st{mod=Mod, idx_state=IdxState}=State) ->
+    couch_util:with_db(Mod:get(db_name, IdxState), fun(Db) ->
+        UpdateSeq = couch_db:get_update_seq(Db),
+        CommittedSeq = couch_db:get_committed_update_seq(Db),
+        CanUpdate = UpdateSeq > CommittedSeq,
+        UOpts = Mod:get(update_options, IdxState),
+        case CanUpdate and lists:member(committed_only, UOpts) of
+            true -> couch_db:ensure_full_commit(Db);
+            false -> ok
+        end
+    end),
+    couch_index_updater:run(State#st.updater, IdxState).
+
+
+send_all(Waiters, Reply) ->
+    [gen_server:reply(From, Reply) || {From, _} <- Waiters].
+
+
+send_replies(Waiters, UpdateSeq, IdxState) ->
+    Pred = fun({_, S}) -> S =< UpdateSeq end,
+    {ToSend, Remaining} = lists:partition(Pred, Waiters),
+    [gen_server:reply(From, {ok, IdxState}) || {From, _} <- ToSend],
+    Remaining.
+
+assert_signature_match(Mod, OldIdxState, NewIdxState) ->
+    case {Mod:get(signature, OldIdxState), Mod:get(signature, NewIdxState)} of
+        {Sig, Sig} -> ok;
+        _ -> erlang:error(signature_mismatch)
+    end.

http://git-wip-us.apache.org/repos/asf/couchdb/blob/add91738/apps/couch_index/src/couch_index_api.erl
----------------------------------------------------------------------
diff --git a/apps/couch_index/src/couch_index_api.erl b/apps/couch_index/src/couch_index_api.erl
new file mode 100644
index 0000000..9d3a67c
--- /dev/null
+++ b/apps/couch_index/src/couch_index_api.erl
@@ -0,0 +1,54 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_index_api).
+
+get(Field, State) ->
+    ok.
+
+init(Db, Ddoc) ->
+    ok.
+
+open(Db, State) ->
+    ok.
+
+close(State) ->
+    ok.
+
+delete(State) ->
+    ok.
+
+reset(State) ->
+    ok.
+
+
+start_update(State, PurgedState, NumChanges) ->
+    {ok, State}.
+
+purge(Db, PurgeSeq, PurgedIdRevs, State) ->
+    ok.
+
+process_doc(Doc, Seq, State) ->
+    ok.
+
+finish_update(State) ->
+    {ok, State}.
+
+commit(State) ->
+    ok.
+
+
+compact(Parent, State, Opts) ->
+    ok.
+
+swap_compacted(OldState, NewState) ->
+    ok.

http://git-wip-us.apache.org/repos/asf/couchdb/blob/add91738/apps/couch_index/src/couch_index_compactor.erl
----------------------------------------------------------------------
diff --git a/apps/couch_index/src/couch_index_compactor.erl b/apps/couch_index/src/couch_index_compactor.erl
new file mode 100644
index 0000000..6e9fb2e
--- /dev/null
+++ b/apps/couch_index/src/couch_index_compactor.erl
@@ -0,0 +1,113 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_index_compactor).
+-behaviour(gen_server).
+
+
+%% API
+-export([start_link/2, run/2, cancel/1, is_running/1]).
+
+%% gen_server callbacks
+-export([init/1, terminate/2, code_change/3]).
+-export([handle_call/3, handle_cast/2, handle_info/2]).
+
+
+-include_lib("couch/include/couch_db.hrl").
+
+-record(st, {
+    idx,
+    mod,
+    pid
+}).
+
+
+start_link(Index, Module) ->
+    gen_server:start_link(?MODULE, {Index, Module}, []).
+
+
+run(Pid, IdxState) ->
+    gen_server:call(Pid, {compact, IdxState}).
+
+
+cancel(Pid) ->
+    gen_server:call(Pid, cancel).
+
+
+is_running(Pid) ->
+    gen_server:call(Pid, is_running).
+
+
+init({Index, Module}) ->
+    process_flag(trap_exit, true),
+    {ok, #st{idx=Index, mod=Module}}.
+
+
+terminate(_Reason, State) ->
+    couch_util:shutdown_sync(State#st.pid),
+    ok.
+
+
+handle_call({compact, _}, _From, #st{pid=Pid}=State) when is_pid(Pid) ->
+    {reply, {ok, Pid}, State};
+handle_call({compact, IdxState}, _From, #st{idx=Idx}=State) ->
+    Pid = spawn_link(fun() -> compact(Idx, State#st.mod, IdxState) end),
+    {reply, {ok, Pid}, State#st{pid=Pid}};
+handle_call(cancel, _From, #st{pid=undefined}=State) ->
+    {reply, ok, State};
+handle_call(cancel, _From, #st{pid=Pid}=State) ->
+    unlink(Pid),
+    exit(Pid, kill),
+    {reply, ok, State#st{pid=undefined}};
+handle_call(is_running, _From, #st{pid=Pid}=State) when is_pid(Pid) ->
+    {reply, true, State};
+handle_call(is_running, _From, State) ->
+    {reply, false, State}.
+
+
+handle_cast(_Mesg, State) ->
+    {stop, unknown_cast, State}.
+
+
+handle_info({'EXIT', Pid, normal}, #st{pid=Pid}=State) ->
+    {noreply, State#st{pid=undefined}};
+handle_info({'EXIT', _Pid, normal}, State) ->
+    {noreply, State};
+handle_info({'EXIT', Pid, _Reason}, #st{idx=Pid}=State) ->
+    {stop, normal, State};
+handle_info(_Mesg, State) ->
+    {stop, unknown_info, State}.
+
+
+code_change(_OldVsn, State, _Extra) ->
+    {ok, State}.
+
+
+compact(Parent, Mod, IdxState) ->
+    compact(Parent, Mod, IdxState, []).
+
+compact(Idx, Mod, IdxState, Opts) ->
+    DbName = Mod:get(db_name, IdxState),
+    Args = [DbName, Mod:get(idx_name, IdxState)],
+    ?LOG_INFO("Compaction started for db: ~s idx: ~s", Args),
+    {ok, NewIdxState} = couch_util:with_db(DbName, fun(Db) ->
+        Mod:compact(Db, IdxState, Opts)
+    end),
+    ok = Mod:commit(NewIdxState),
+    case gen_server:call(Idx, {compacted, NewIdxState}) of
+        recompact ->
+            ?LOG_INFO("Compaction restarting for db: ~s idx: ~s", Args),
+            compact(Idx, Mod, NewIdxState, [recompact]);
+        _ ->
+            ?LOG_INFO("Compaction finished for db: ~s idx: ~s", Args),
+            ok
+    end.

http://git-wip-us.apache.org/repos/asf/couchdb/blob/add91738/apps/couch_index/src/couch_index_server.erl
----------------------------------------------------------------------
diff --git a/apps/couch_index/src/couch_index_server.erl b/apps/couch_index/src/couch_index_server.erl
new file mode 100644
index 0000000..7a7bd55
--- /dev/null
+++ b/apps/couch_index/src/couch_index_server.erl
@@ -0,0 +1,201 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_index_server).
+-behaviour(gen_server).
+
+-export([start_link/0, get_index/4, get_index/3, get_index/2]).
+-export([config_change/2, update_notify/1]).
+
+-export([init/1, terminate/2, code_change/3]).
+-export([handle_call/3, handle_cast/2, handle_info/2]).
+
+-include_lib("couch/include/couch_db.hrl").
+
+-define(BY_SIG, couchdb_indexes_by_sig).
+-define(BY_PID, couchdb_indexes_by_pid).
+-define(BY_DB, couchdb_indexes_by_db).
+
+
+-record(st, {root_dir}).
+
+start_link() ->
+    gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
+
+
+get_index(Module, DbName, DDoc) ->
+    get_index(Module, DbName, DDoc, nil).
+
+
+get_index(Module, DbName, DDoc, Fun) when is_binary(DbName) ->
+    couch_util:with_db(DbName, fun(Db) ->
+        get_index(Module, Db, DDoc, Fun)
+    end);
+get_index(Module, Db, DDoc, Fun) when is_binary(DDoc) ->
+    case couch_db:open_doc(Db, DDoc, [ejson_body]) of
+        {ok, Doc} -> get_index(Module, Db, Doc, Fun);
+        Error -> Error
+    end;
+get_index(Module, Db, DDoc, Fun) when is_function(Fun, 1) ->
+    {ok, InitState} = Module:init(Db, DDoc),
+    {ok, FunResp} = Fun(InitState),
+    {ok, Pid} = get_index(Module, InitState),
+    {ok, Pid, FunResp};
+get_index(Module, Db, DDoc, _Fun) ->
+    {ok, InitState} = Module:init(Db, DDoc),
+    get_index(Module, InitState).
+
+
+get_index(Module, IdxState) ->
+    DbName = Module:get(db_name, IdxState),
+    Sig = Module:get(signature, IdxState),
+    case ets:lookup(?BY_SIG, {DbName, Sig}) of
+        [{_, Pid}] when is_pid(Pid) ->
+            {ok, Pid};
+        _ ->
+            Args = {Module, IdxState, DbName, Sig},
+            gen_server:call(?MODULE, {get_index, Args}, infinity)
+    end.
+
+
+init([]) ->
+    process_flag(trap_exit, true),
+    couch_config:register(fun ?MODULE:config_change/2),
+    ets:new(?BY_SIG, [protected, set, named_table]),
+    ets:new(?BY_PID, [private, set, named_table]),
+    ets:new(?BY_DB, [protected, bag, named_table]),
+    couch_db_update_notifier:start_link(fun ?MODULE:update_notify/1),
+    RootDir = couch_index_util:root_dir(),
+    couch_file:init_delete_dir(RootDir),
+    {ok, #st{root_dir=RootDir}}.
+
+
+terminate(_Reason, _State) ->
+    Pids = [Pid || {Pid, _} <- ets:tab2list(?BY_PID)],
+    lists:map(fun couch_util:shutdown_sync/1, Pids),
+    ok.
+
+
+handle_call({get_index, {_Mod, _IdxState, DbName, Sig}=Args}, From, State) ->
+    case ets:lookup(?BY_SIG, {DbName, Sig}) of
+        [] ->
+            spawn_link(fun() -> new_index(Args) end),
+            ets:insert(?BY_SIG, {{DbName, Sig}, [From]}),
+            {noreply, State};
+        [{_, Waiters}] when is_list(Waiters) ->
+            ets:insert(?BY_SIG, {{DbName, Sig}, [From | Waiters]}),
+            {noreply, State};
+        [{_, Pid}] when is_pid(Pid) ->
+            {reply, {ok, Pid}, State}
+    end;
+handle_call({async_open, {DbName, DDocId, Sig}, {ok, Pid}}, _From, State) ->
+    [{_, Waiters}] = ets:lookup(?BY_SIG, {DbName, Sig}),
+    [gen_server:reply(From, {ok, Pid}) || From <- Waiters],
+    link(Pid),
+    add_to_ets(DbName, Sig, DDocId, Pid),
+    {reply, ok, State};
+handle_call({async_error, {DbName, _DDocId, Sig}, Error}, _From, State) ->
+    [{_, Waiters}] = ets:lookup(?BY_SIG, {DbName, Sig}),
+    [gen_server:reply(From, Error) || From <- Waiters],
+    ets:delete(?BY_SIG, {DbName, Sig}),
+    {reply, ok, State};
+handle_call({reset_indexes, DbName}, _From, State) ->
+    reset_indexes(DbName, State#st.root_dir),
+    {reply, ok, State}.
+
+
+handle_cast({reset_indexes, DbName}, State) ->
+    reset_indexes(DbName, State#st.root_dir),
+    {noreply, State}.
+
+
+handle_info({'EXIT', Pid, Reason}, Server) ->
+    case ets:lookup(?BY_PID, Pid) of
+        [{Pid, {DbName, Sig}}] ->
+            [{DbName, {DDocId, Sig}}] =
+                ets:match_object(?BY_DB, {DbName, {'$1', Sig}}),
+            rem_from_ets(DbName, Sig, DDocId, Pid);
+        [] when Reason /= normal ->
+            exit(Reason);
+        _Else ->
+            ok
+    end,
+    {noreply, Server}.
+
+
+code_change(_OldVsn, State, _Extra) ->
+    {ok, State}.
+
+
+new_index({Mod, IdxState, DbName, Sig}) ->
+    DDocId = Mod:get(idx_name, IdxState),
+    case couch_index:start_link({Mod, IdxState}) of
+        {ok, Pid} ->
+            ok = gen_server:call(
+                ?MODULE, {async_open, {DbName, DDocId, Sig}, {ok, Pid}}),
+            unlink(Pid);
+        Error ->
+            ok = gen_server:call(
+                ?MODULE, {async_error, {DbName, DDocId, Sig}, Error})
+    end.
+
+
+reset_indexes(DbName, Root) ->
+    % shutdown all the updaters and clear the files, the db got changed
+    Fun = fun({_, {DDocId, Sig}}) ->
+        [{_, Pid}] = ets:lookup(?BY_SIG, {DbName, Sig}),
+        MRef = erlang:monitor(process, Pid),
+        gen_server:cast(Pid, delete),
+        receive {'DOWN', MRef, _, _, _} -> ok end,
+        rem_from_ets(DbName, Sig, DDocId, Pid)
+    end,
+    lists:foreach(Fun, ets:lookup(?BY_DB, DbName)),
+    Path = couch_index_util:index_dir("", DbName),
+    couch_file:nuke_dir(Root, Path).
+
+
+add_to_ets(DbName, Sig, DDocId, Pid) ->
+    ets:insert(?BY_SIG, {{DbName, Sig}, Pid}),
+    ets:insert(?BY_PID, {Pid, {DbName, Sig}}),
+    ets:insert(?BY_DB, {DbName, {DDocId, Sig}}).
+
+
+rem_from_ets(DbName, Sig, DDocId, Pid) ->
+    ets:delete(?BY_SIG, {DbName, Sig}),
+    ets:delete(?BY_PID, Pid),
+    ets:delete_object(?BY_DB, {DbName, {DDocId, Sig}}).
+
+
+config_change("couchdb", "view_index_dir") ->
+    exit(whereis(?MODULE), config_change);
+config_change("couchdb", "index_dir") ->
+    exit(whereis(?MODULE), config_change).
+
+
+update_notify({deleted, DbName}) ->
+    gen_server:cast(?MODULE, {reset_indexes, DbName});
+update_notify({created, DbName}) ->
+    gen_server:cast(?MODULE, {reset_indexes, DbName});
+update_notify({ddoc_updated, {DbName, DDocId}}) ->
+    lists:foreach(
+        fun({_DbName, {_DDocId, Sig}}) ->
+            case ets:lookup(?BY_SIG, {DbName, Sig}) of
+                [{_, IndexPid}] ->
+                    (catch gen_server:cast(IndexPid, ddoc_updated));
+                [] ->
+                    ok
+            end
+        end,
+        ets:match_object(?BY_DB, {DbName, {DDocId, '$1'}}));
+update_notify(_) ->
+    ok.
+

http://git-wip-us.apache.org/repos/asf/couchdb/blob/add91738/apps/couch_index/src/couch_index_updater.erl
----------------------------------------------------------------------
diff --git a/apps/couch_index/src/couch_index_updater.erl b/apps/couch_index/src/couch_index_updater.erl
new file mode 100644
index 0000000..9f54a56
--- /dev/null
+++ b/apps/couch_index/src/couch_index_updater.erl
@@ -0,0 +1,200 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_index_updater).
+-behaviour(gen_server).
+
+
+%% API
+-export([start_link/2, run/2, is_running/1, update/2, restart/2]).
+
+%% gen_server callbacks
+-export([init/1, terminate/2, code_change/3]).
+-export([handle_call/3, handle_cast/2, handle_info/2]).
+
+-include_lib("couch/include/couch_db.hrl").
+
+-record(st, {
+    idx,
+    mod,
+    pid=nil
+}).
+
+
+start_link(Index, Module) ->
+    gen_server:start_link(?MODULE, {Index, Module}, []).
+
+
+run(Pid, IdxState) ->
+    gen_server:call(Pid, {update, IdxState}).
+
+
+is_running(Pid) ->
+    gen_server:call(Pid, is_running).
+
+
+update(Mod, State) ->
+    update(nil, Mod, State).
+
+
+restart(Pid, IdxState) ->
+    gen_server:call(Pid, {restart, IdxState}).
+
+
+init({Index, Module}) ->
+    process_flag(trap_exit, true),
+    {ok, #st{idx=Index, mod=Module}}.
+
+
+terminate(_Reason, State) ->
+    couch_util:shutdown_sync(State#st.pid),
+    ok.
+
+
+handle_call({update, _IdxState}, _From, #st{pid=Pid}=State) when is_pid(Pid) ->
+    {reply, ok, State};
+handle_call({update, IdxState}, _From, #st{idx=Idx, mod=Mod}=State) ->
+    Args = [Mod:get(db_name, IdxState), Mod:get(idx_name, IdxState)],
+    ?LOG_INFO("Starting index update for db: ~s idx: ~s", Args),
+    Pid = spawn_link(fun() -> update(Idx, Mod, IdxState) end),
+    {reply, ok, State#st{pid=Pid}};
+handle_call({restart, IdxState}, _From, #st{idx=Idx, mod=Mod}=State) ->
+    Args = [Mod:get(db_name, IdxState), Mod:get(idx_name, IdxState)],
+    ?LOG_INFO("Restarting index update for db: ~s idx: ~s", Args),
+    case is_pid(State#st.pid) of
+        true -> couch_util:shutdown_sync(State#st.pid);
+        _ -> ok
+    end,
+    Pid = spawn_link(fun() -> update(Idx, State#st.mod, IdxState) end),
+    {reply, ok, State#st{pid=Pid}};
+handle_call(is_running, _From, #st{pid=Pid}=State) when is_pid(Pid) ->
+    {reply, true, State};
+handle_call(is_running, _From, State) ->
+    {reply, false, State}.
+
+
+handle_cast(_Mesg, State) ->
+    {stop, unknown_cast, State}.
+
+
+handle_info({'EXIT', _, {updated, Pid, IdxState}}, #st{pid=Pid}=State) ->
+    Mod = State#st.mod,
+    Args = [Mod:get(db_name, IdxState), Mod:get(idx_name, IdxState)],
+    ?LOG_INFO("Index update finished for db: ~s idx: ~s", Args),
+    ok = gen_server:cast(State#st.idx, {updated, IdxState}),
+    {noreply, State#st{pid=undefined}};
+handle_info({'EXIT', _, {reset, Pid}}, #st{idx=Idx, pid=Pid}=State) ->
+    {ok, NewIdxState} = gen_server:call(State#st.idx, reset),
+    Pid2 = spawn_link(fun() -> update(Idx, State#st.mod, NewIdxState) end),
+    {noreply, State#st{pid=Pid2}};
+handle_info({'EXIT', Pid, normal}, #st{pid=Pid}=State) ->
+    {noreply, State#st{pid=undefined}};
+handle_info({'EXIT', Pid, {{nocatch, Error}, _Trace}}, State) ->
+    handle_info({'EXIT', Pid, Error}, State);
+handle_info({'EXIT', Pid, Error}, #st{pid=Pid}=State) ->
+    ok = gen_server:cast(State#st.idx, {update_error, Error}),
+    {noreply, State#st{pid=undefined}};
+handle_info({'EXIT', Pid, _Reason}, #st{idx=Pid}=State) ->
+    {stop, normal, State};
+handle_info({'EXIT', _Pid, normal}, State) ->
+    {noreply, State};
+handle_info(_Mesg, State) ->
+    {stop, unknown_info, State}.
+
+
+code_change(_OldVsn, State, _Extra) ->
+    {ok, State}.
+
+
+update(Idx, Mod, IdxState) ->
+    DbName = Mod:get(db_name, IdxState),
+    CurrSeq = Mod:get(update_seq, IdxState),
+    UpdateOpts = Mod:get(update_options, IdxState),
+    CommittedOnly = lists:member(committed_only, UpdateOpts),
+    IncludeDesign = lists:member(include_design, UpdateOpts),
+    DocOpts = case lists:member(local_seq, UpdateOpts) of
+        true -> [conflicts, deleted_conflicts, local_seq];
+        _ -> [conflicts, deleted_conflicts]
+    end,
+
+    couch_util:with_db(DbName, fun(Db) ->
+        DbUpdateSeq = couch_db:get_update_seq(Db),
+        DbCommittedSeq = couch_db:get_committed_update_seq(Db),
+
+        PurgedIdxState = case purge_index(Db, Mod, IdxState) of
+            {ok, IdxState0} -> IdxState0;
+            reset -> exit({reset, self()})
+        end,
+
+        NumChanges = couch_db:count_changes_since(Db, CurrSeq),
+
+        LoadDoc = fun(DocInfo) ->
+            #doc_info{
+                id=DocId,
+                high_seq=Seq,
+                revs=[#rev_info{deleted=Deleted} | _]
+            } = DocInfo,
+
+            case {IncludeDesign, DocId} of
+                {false, <<"_design/", _/binary>>} ->
+                    {nil, Seq};
+                _ when Deleted ->
+                    {#doc{id=DocId, deleted=true}, Seq};
+                _ ->
+                    {ok, Doc} = couch_db:open_doc_int(Db, DocInfo, DocOpts),
+                    {Doc, Seq}
+            end
+        end,
+
+        Proc = fun(DocInfo, _, {IdxStateAcc, _}) ->
+            HighSeq = DocInfo#doc_info.high_seq,
+            case CommittedOnly and (HighSeq > DbCommittedSeq) of
+                true ->
+                    {stop, {IdxStateAcc, false}};
+                false ->
+                    {Doc, Seq} = LoadDoc(DocInfo),
+                    {ok, NewSt} = Mod:process_doc(Doc, Seq, IdxStateAcc),
+                    {ok, {NewSt, true}}
+            end
+        end,
+
+        {ok, InitIdxState} = Mod:start_update(Idx, PurgedIdxState, NumChanges),
+        Acc0 = {InitIdxState, true},
+        {ok, _, Acc} = couch_db:enum_docs_since(Db, CurrSeq, Proc, Acc0, []),
+        {ProcIdxSt, SendLast} = Acc,
+
+        % If we didn't bail due to hitting the last committed seq we need
+        % to send our last update_seq through.
+        {ok, LastIdxSt} = case SendLast of
+            true ->
+                Mod:process_doc(nil, DbUpdateSeq, ProcIdxSt);
+            _ ->
+                {ok, ProcIdxSt}
+        end,
+
+        {ok, FinalIdxState} = Mod:finish_update(LastIdxSt),
+        exit({updated, self(), FinalIdxState})
+    end).
+
+
+purge_index(Db, Mod, IdxState) ->
+    DbPurgeSeq = couch_db:get_purge_seq(Db),
+    IdxPurgeSeq = Mod:get(purge_seq, IdxState),
+    if
+        DbPurgeSeq == IdxPurgeSeq ->
+            {ok, IdxState};
+        DbPurgeSeq == IdxPurgeSeq + 1 ->
+            {ok, PurgedIdRevs} = couch_db:get_last_purged(Db),
+            Mod:purge(Db, DbPurgeSeq, PurgedIdRevs, IdxState);
+        true ->
+            reset
+    end.

http://git-wip-us.apache.org/repos/asf/couchdb/blob/add91738/apps/couch_index/src/couch_index_util.erl
----------------------------------------------------------------------
diff --git a/apps/couch_index/src/couch_index_util.erl b/apps/couch_index/src/couch_index_util.erl
new file mode 100644
index 0000000..c833920
--- /dev/null
+++ b/apps/couch_index/src/couch_index_util.erl
@@ -0,0 +1,77 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_index_util).
+
+-export([root_dir/0, index_dir/2, index_file/3]).
+-export([load_doc/3, sort_lib/1, hexsig/1]).
+
+-include_lib("couch/include/couch_db.hrl").
+
+root_dir() ->
+  couch_config:get("couchdb", "view_index_dir").
+
+
+index_dir(Module, DbName) when is_binary(DbName) ->
+    DbDir = "." ++ binary_to_list(DbName) ++ "_design",
+    filename:join([root_dir(), DbDir, Module]);
+index_dir(Module, #db{}=Db) ->
+    index_dir(Module, couch_db:name(Db)).
+
+
+index_file(Module, DbName, FileName) ->
+    filename:join(index_dir(Module, DbName), FileName).
+
+
+load_doc(Db, #doc_info{}=DI, Opts) ->
+    Deleted = lists:member(deleted, Opts),
+    case (catch couch_db:open_doc(Db, DI, Opts)) of
+        {ok, #doc{deleted=false}=Doc} -> Doc;
+        {ok, #doc{deleted=true}=Doc} when Deleted -> Doc;
+        _Else -> null
+    end;
+load_doc(Db, {DocId, Rev}, Opts) ->
+    case (catch load_doc(Db, DocId, Rev, Opts)) of
+        #doc{deleted=false} = Doc -> Doc;
+        _ -> null
+    end.
+
+
+load_doc(Db, DocId, Rev, Options) ->
+    case Rev of
+        nil -> % open most recent rev
+            case (catch couch_db:open_doc(Db, DocId, Options)) of
+                {ok, Doc} -> Doc;
+                _Error -> null
+            end;
+        _ -> % open a specific rev (deletions come back as stubs)
+            case (catch couch_db:open_doc_revs(Db, DocId, [Rev], Options)) of
+                {ok, [{ok, Doc}]} -> Doc;
+                {ok, [{{not_found, missing}, Rev}]} -> null;
+                {ok, [_Else]} -> null
+            end
+    end.
+
+
+sort_lib({Lib}) ->
+    sort_lib(Lib, []).
+sort_lib([], LAcc) ->
+    lists:keysort(1, LAcc);
+sort_lib([{LName, {LObj}}|Rest], LAcc) ->
+    LSorted = sort_lib(LObj, []), % descend into nested object
+    sort_lib(Rest, [{LName, LSorted}|LAcc]);
+sort_lib([{LName, LCode}|Rest], LAcc) ->
+    sort_lib(Rest, [{LName, LCode}|LAcc]).
+
+
+hexsig(Sig) ->
+    couch_util:to_hex(binary_to_list(Sig)).

http://git-wip-us.apache.org/repos/asf/couchdb/blob/add91738/apps/couch_mrview/Makefile.am
----------------------------------------------------------------------
diff --git a/apps/couch_mrview/Makefile.am b/apps/couch_mrview/Makefile.am
new file mode 100644
index 0000000..2b9ef86
--- /dev/null
+++ b/apps/couch_mrview/Makefile.am
@@ -0,0 +1,73 @@
+## Licensed under the Apache License, Version 2.0 (the "License"); you may not
+## use this file except in compliance with the License. You may obtain a copy of
+## the License at
+##
+##   http://www.apache.org/licenses/LICENSE-2.0
+##
+## Unless required by applicable law or agreed to in writing, software
+## distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+## WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+## License for the specific language governing permissions and limitations under
+## the License.
+
+couch_mrviewlibdir = $(localerlanglibdir)/couch_mrview-0.1
+couch_mrviewincludedir = $(couch_mrviewlibdir)/include
+couch_mrviewebindir = $(couch_mrviewlibdir)/ebin
+
+couch_mrviewinclude_DATA = $(include_files)
+couch_mrviewebin_DATA = $(compiled_files)
+
+include_files = \
+    include/couch_mrview.hrl
+
+source_files = \
+    src/couch_mrview.app.src \
+    src/couch_mrview.erl \
+    src/couch_mrview_cleanup.erl \
+    src/couch_mrview_compactor.erl \
+    src/couch_mrview_http.erl \
+    src/couch_mrview_index.erl \
+    src/couch_mrview_show.erl \
+    src/couch_mrview_test_util.erl \
+    src/couch_mrview_updater.erl \
+    src/couch_mrview_util.erl
+
+test_files = \
+    test/01-load.t \
+    test/02-map-views.t \
+    test/03-red-views.t \
+    test/04-index-info.t \
+    test/05-collation.t \
+    test/06-all-docs.t \
+	test/07-compact-swap.t
+
+compiled_files = \
+    ebin/couch_mrview.app \
+    ebin/couch_mrview.beam \
+    ebin/couch_mrview_cleanup.beam \
+    ebin/couch_mrview_compactor.beam \
+    ebin/couch_mrview_http.beam \
+    ebin/couch_mrview_index.beam \
+    ebin/couch_mrview_show.beam \
+    ebin/couch_mrview_test_util.beam \
+    ebin/couch_mrview_updater.beam \
+    ebin/couch_mrview_util.beam
+
+EXTRA_DIST = $(include_files) $(source_files) $(test_files)
+CLEANFILES = $(compiled_files)
+
+check:
+if TESTS
+	$(abs_top_builddir)/test/etap/run $(abs_top_srcdir)/src/couch_mrview/test
+endif
+
+ebin/%.app: src/%.app.src
+	@mkdir -p ebin/
+	sed -e "s|%version%|@version@|g" \
+	< $< > $@
+
+ebin/%.beam: src/%.erl $(include_files)
+	@mkdir -p ebin/
+	$(ERLC) -Wall -I$(top_srcdir)/src -I$(top_srcdir)/src/couchdb \
+        -o ebin/ $(ERLC_FLAGS) ${TEST} $<;
+

http://git-wip-us.apache.org/repos/asf/couchdb/blob/add91738/apps/couch_mrview/include/couch_mrview.hrl
----------------------------------------------------------------------
diff --git a/apps/couch_mrview/include/couch_mrview.hrl b/apps/couch_mrview/include/couch_mrview.hrl
new file mode 100644
index 0000000..e4ec66d
--- /dev/null
+++ b/apps/couch_mrview/include/couch_mrview.hrl
@@ -0,0 +1,81 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-record(mrst, {
+    sig=nil,
+    fd=nil,
+    refc,
+    db_name,
+    idx_name,
+    language,
+    design_opts=[],
+    lib,
+    views,
+    id_btree=nil,
+    update_seq=0,
+    purge_seq=0,
+
+    first_build,
+    partial_resp_pid,
+    doc_acc,
+    doc_queue,
+    write_queue,
+    qserver=nil
+}).
+
+
+-record(mrview, {
+    id_num,
+    update_seq=0,
+    purge_seq=0,
+    map_names=[],
+    reduce_funs=[],
+    def,
+    btree=nil,
+    options=[]
+}).
+
+
+-record(mrheader, {
+    seq=0,
+    purge_seq=0,
+    id_btree_state=nil,
+    view_states=nil
+}).
+
+
+-record(mrargs, {
+    view_type,
+    reduce,
+
+    preflight_fun,
+
+    start_key,
+    start_key_docid,
+    end_key,
+    end_key_docid,
+    keys,
+
+    direction = fwd,
+    limit = 16#10000000,
+    skip = 0,
+    group_level = 0,
+    stale = false,
+    inclusive_end = true,
+    include_docs = false,
+    doc_options = [],
+    update_seq=false,
+    conflicts,
+    callback,
+    list,
+    extra = []
+}).

http://git-wip-us.apache.org/repos/asf/couchdb/blob/add91738/apps/couch_mrview/src/couch_mrview.app.src
----------------------------------------------------------------------
diff --git a/apps/couch_mrview/src/couch_mrview.app.src b/apps/couch_mrview/src/couch_mrview.app.src
new file mode 100644
index 0000000..52898a6
--- /dev/null
+++ b/apps/couch_mrview/src/couch_mrview.app.src
@@ -0,0 +1,28 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+{application, couch_mrview, [
+    {description, "CouchDB Map/Reduce Views"},
+    {vsn, "%version%"},
+    {modules, [
+        couch_mrview,
+        couch_mrview_compactor,
+        couch_mrview_http,
+        couch_mrview_index,
+        couch_mrview_show,
+        couch_mrview_test_util,
+        couch_mrview_updater,
+        couch_mrview_util
+    ]},
+    {registered, []},
+    {applications, [kernel, stdlib, couch_index]}
+]}.


Mime
View raw message