summaryrefslogtreecommitdiff
path: root/src/couchdb
diff options
context:
space:
mode:
authorChristopher Lenz <cmlenz@apache.org>2008-03-28 23:32:19 +0000
committerChristopher Lenz <cmlenz@apache.org>2008-03-28 23:32:19 +0000
commit544a38dd45f6a58d34296c6c768afd086eb2ac70 (patch)
treec84cc02340b06aae189cff0dbfaee698f273f1f5 /src/couchdb
parent804cbbe033b8e7a3e8d7058aaf31bdf69ef18ac5 (diff)
Imported trunk.
git-svn-id: https://svn.apache.org/repos/asf/incubator/couchdb/trunk@642432 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'src/couchdb')
-rw-r--r--src/couchdb/Makefile.am97
-rw-r--r--src/couchdb/cjson.erl567
-rw-r--r--src/couchdb/couch.app.tpl.in29
-rw-r--r--src/couchdb/couch_btree.erl590
-rw-r--r--src/couchdb/couch_db.erl757
-rw-r--r--src/couchdb/couch_db.hrl56
-rw-r--r--src/couchdb/couch_db_update_notifier.erl66
-rw-r--r--src/couchdb/couch_doc.erl199
-rw-r--r--src/couchdb/couch_erl_driver.c160
-rw-r--r--src/couchdb/couch_event_sup.erl69
-rw-r--r--src/couchdb/couch_file.erl323
-rw-r--r--src/couchdb/couch_ft_query.erl78
-rw-r--r--src/couchdb/couch_js.c452
-rw-r--r--src/couchdb/couch_key_tree.erl139
-rw-r--r--src/couchdb/couch_log.erl130
-rw-r--r--src/couchdb/couch_query_servers.erl206
-rw-r--r--src/couchdb/couch_rep.erl308
-rw-r--r--src/couchdb/couch_server.erl215
-rw-r--r--src/couchdb/couch_server_sup.erl185
-rw-r--r--src/couchdb/couch_stream.erl252
-rw-r--r--src/couchdb/couch_util.erl316
-rw-r--r--src/couchdb/couch_view.erl616
-rw-r--r--src/couchdb/mod_couch.erl891
23 files changed, 6701 insertions, 0 deletions
diff --git a/src/couchdb/Makefile.am b/src/couchdb/Makefile.am
new file mode 100644
index 00000000..bf0c31bc
--- /dev/null
+++ b/src/couchdb/Makefile.am
@@ -0,0 +1,97 @@
+## Licensed under the Apache License, Version 2.0 (the "License"); you may not
+## use this file except in compliance with the License. You may obtain a copy
+## of the License at
+##
+## http://www.apache.org/licenses/LICENSE-2.0
+##
+## Unless required by applicable law or agreed to in writing, software
+## distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+## WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+## License for the specific language governing permissions and limitations under
+## the License.
+
+datarootdir = @prefix@/share
+
+ICU_LOCAL_FLAGS = $(ICU_LOCAL_CFLAGS) $(ICU_LOCAL_LDFLAGS)
+
+couchprivlibdir = $(erlanglibdir)/couch-$(version)/priv/lib
+
+couchprivlib_LTLIBRARIES = couch_erl_driver.la
+couch_erl_driver_la_SOURCES = couch_erl_driver.c
+couch_erl_driver_la_LDFLAGS = -module -avoid-version $(ICU_LOCAL_FLAGS)
+couch_erl_driver_la_CFLAGS = $(ICU_LOCAL_FLAGS)
+couch_erl_driver_la_LIBADD = -licuuc -licudata -licui18n
+
+libbin_PROGRAMS = couchjs
+couchjs_SOURCES = couch_js.c
+
+couchebindir = $(erlanglibdir)/couch-$(version)/ebin
+couchincludedir = $(erlanglibdir)/couch-$(version)/include
+
+couch_file_collection = \
+ cjson.erl \
+ couch_btree.erl \
+ couch_db.erl \
+ couch_db_update_notifier.erl \
+ couch_doc.erl \
+ couch_event_sup.erl \
+ couch_file.erl \
+ couch_ft_query.erl \
+ couch_key_tree.erl \
+ couch_log.erl \
+ couch_query_servers.erl \
+ couch_rep.erl \
+ couch_server.erl \
+ couch_server_sup.erl \
+ couch_stream.erl \
+ couch_util.erl \
+ couch_view.erl \
+ mod_couch.erl
+
+couchebin_DATA = \
+ cjson.beam \
+ couch.app \
+ couch_btree.beam \
+ couch_db.beam \
+ couch_db_update_notifier.beam \
+ couch_doc.beam \
+ couch_event_sup.beam \
+ couch_file.beam \
+ couch_ft_query.beam \
+ couch_key_tree.beam \
+ couch_log.beam \
+ couch_query_servers.beam \
+ couch_rep.beam \
+ couch_server.beam \
+ couch_server_sup.beam \
+ couch_stream.beam \
+ couch_util.beam \
+ couch_view.beam \
+ mod_couch.beam
+
+couchinclude_DATA = couch_db.hrl
+
+EXTRA_DIST = $(couch_file_collection) $(couchinclude_DATA)
+
+CLEANFILES = $(couchebin_DATA)
+
+couch.app: couch.app.tpl
+ sed -e "s|%package_name%|@package_name@|g" \
+ -e "s|%version%|@version@|g" > \
+ $@ < $<
+ chmod +x $@
+
+%.beam: %.erl
+ erlc $<
+
+install-data-hook:
+ if test -f "$(DESTDIR)/$(couchprivlibdir)/couch_erl_driver"; then \
+ rm -f "$(DESTDIR)/$(couchprivlibdir)/couch_erl_driver.so"; \
+ cd "$(DESTDIR)/$(couchprivlibdir)" && \
+ $(LN_S) couch_erl_driver couch_erl_driver.so; \
+ fi
+
+uninstall-local:
+ if test -f "$(DESTDIR)/$(couchprivlibdir)/couch_erl_driver"; then \
+ rm -f "$(DESTDIR)/$(couchprivlibdir)/couch_erl_driver.so"; \
+ fi
diff --git a/src/couchdb/cjson.erl b/src/couchdb/cjson.erl
new file mode 100644
index 00000000..042d5c41
--- /dev/null
+++ b/src/couchdb/cjson.erl
@@ -0,0 +1,567 @@
+%% @author Bob Ippolito <bob@mochimedia.com>
+%% @copyright 2006 Mochi Media, Inc.
+%%
+%% Permission is hereby granted, free of charge, to any person
+%% obtaining a copy of this software and associated documentation
+%% files (the "Software"), to deal in the Software without restriction,
+%% including without limitation the rights to use, copy, modify, merge,
+%% publish, distribute, sublicense, and/or sell copies of the Software,
+%% and to permit persons to whom the Software is furnished to do
+%% so, subject to the following conditions:
+%%
+%% The above copyright notice and this permission notice shall be included
+%% in all copies or substantial portions of the Software.
+%%
+%% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
+%% EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
+%% MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
+%% IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
+%% CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT,
+%% TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE
+%% SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
+
+%% @doc Yet another JSON (RFC 4627) library for Erlang.
+
+-module(cjson).
+-author('bob@mochimedia.com').
+-export([encoder/1, encode/1]).
+-export([decoder/1, decode/1]).
+-export([test/0]).
+
+%
+% NOTE: This file was originally mochijson.erl and has been adapted for
+% use with CouchDB.
+%
+% The changes are:
+% {array, [...]}
+% is now
+% {...}
+% and:
+% {struct, [...]}
+% is now
+% {obj, [...]}
+%
+
+% This is a macro to placate syntax highlighters..
+-define(Q, $\").
+-define(ADV_COL(S, N), S#decoder{column=N+S#decoder.column}).
+-define(INC_COL(S), S#decoder{column=1+S#decoder.column}).
+-define(INC_LINE(S), S#decoder{column=1, line=1+S#decoder.line}).
+
+%% @type iolist() = [char() | binary() | iolist()]
+%% @type iodata() = iolist() | binary()
+%% @type json_string() = atom | string() | binary()
+%% @type json_number() = integer() | float()
+%% @type json_array() = {json_term()}
+%% @type json_object() = {struct, [{json_string(), json_term()}]}
+%% @type json_term() = json_string() | json_number() | json_array() |
+%% json_object()
+%% @type encoding() = utf8 | unicode
+%% @type encoder_option() = {input_encoding, encoding()} |
+%% {handler, function()}
+%% @type decoder_option() = {input_encoding, encoding()} |
+%% {object_hook, function()}
+
+-record(encoder, {input_encoding=utf8,
+ handler=null}).
+
+-record(decoder, {input_encoding=utf8,
+ object_hook=null,
+ line=1,
+ column=1,
+ state=null}).
+
+%% @spec encoder([encoder_option()]) -> function()
+%% @doc Create an encoder/1 with the given options.
+encoder(Options) ->
+ State = parse_encoder_options(Options, #encoder{}),
+ fun (O) -> json_encode(O, State) end.
+
+%% @spec encode(json_term()) -> iolist()
+%% @doc Encode the given as JSON to an iolist.
+encode(Any) ->
+ json_encode(Any, #encoder{}).
+
+%% @spec decoder([decoder_option()]) -> function()
+%% @doc Create a decoder/1 with the given options.
+decoder(Options) ->
+ State = parse_decoder_options(Options, #decoder{}),
+ fun (O) -> json_decode(O, State) end.
+
+%% @spec decode(iolist()) -> json_term()
+%% @doc Decode the given iolist to Erlang terms.
+decode(S) ->
+ json_decode(S, #decoder{}).
+
+test() ->
+ test_all().
+
+%% Internal API
+
+parse_encoder_options([], State) ->
+ State;
+parse_encoder_options([{input_encoding, Encoding} | Rest], State) ->
+ parse_encoder_options(Rest, State#encoder{input_encoding=Encoding});
+parse_encoder_options([{handler, Handler} | Rest], State) ->
+ parse_encoder_options(Rest, State#encoder{handler=Handler}).
+
+parse_decoder_options([], State) ->
+ State;
+parse_decoder_options([{input_encoding, Encoding} | Rest], State) ->
+ parse_decoder_options(Rest, State#decoder{input_encoding=Encoding});
+parse_decoder_options([{object_hook, Hook} | Rest], State) ->
+ parse_decoder_options(Rest, State#decoder{object_hook=Hook}).
+
+
+format_float(F) ->
+ format_float1(lists:reverse(float_to_list(F)), []).
+
+format_float1([$0, $0, _, $e | Rest], []) ->
+ strip_zeros(Rest, []);
+format_float1([Sign, $e | Rest], Acc) ->
+ strip_zeros(Rest, [$e, Sign | Acc]);
+format_float1([C | Rest], Acc) ->
+ format_float1(Rest, [C | Acc]).
+
+strip_zeros(L=[$0, $. | _], Acc) ->
+ lists:reverse(L, Acc);
+strip_zeros([$0 | Rest], Acc) ->
+ strip_zeros(Rest, Acc);
+strip_zeros(L, Acc) ->
+ lists:reverse(L, Acc).
+
+json_encode(true, _State) ->
+ "true";
+json_encode(false, _State) ->
+ "false";
+json_encode(null, _State) ->
+ "null";
+json_encode(I, _State) when is_integer(I) ->
+ integer_to_list(I);
+json_encode(F, _State) when is_float(F) ->
+ format_float(F);
+json_encode(L, State) when is_list(L); is_binary(L); is_atom(L) ->
+ json_encode_string(L, State);
+json_encode({obj, Props}, State) when is_list(Props) ->
+ json_encode_proplist(Props, State);
+json_encode(Array, State) when is_tuple(Array) ->
+ json_encode_array(Array, State);
+json_encode(Bad, #encoder{handler=null}) ->
+ exit({json_encode, {bad_term, Bad}});
+json_encode(Bad, State=#encoder{handler=Handler}) ->
+ json_encode(Handler(Bad), State).
+
+json_encode_array({}, _State) ->
+ "[]";
+json_encode_array(Tuple, State) ->
+ F = fun (O, Acc) ->
+ [$,, json_encode(O, State) | Acc]
+ end,
+ [$, | Acc1] = lists:foldl(F, "[", tuple_to_list(Tuple)),
+ lists:reverse([$\] | Acc1]).
+
+json_encode_proplist([], _State) ->
+ "{}";
+json_encode_proplist(Props, State) ->
+ F = fun ({K, V}, Acc) ->
+ KS = case K of
+ K when is_atom(K) ->
+ json_encode_string_utf8(atom_to_list(K), [?Q]);
+ K when is_integer(K) ->
+ json_encode_string(integer_to_list(K), State);
+ K when is_list(K); is_binary(K) ->
+ json_encode_string(K, State)
+ end,
+ VS = json_encode(V, State),
+ [$,, VS, $:, KS | Acc]
+ end,
+ [$, | Acc1] = lists:foldl(F, "{", Props),
+ lists:reverse([$\} | Acc1]).
+
+json_encode_string(A, _State) when is_atom(A) ->
+ json_encode_string_unicode(xmerl_ucs:from_utf8(atom_to_list(A)), [?Q]);
+json_encode_string(B, _State) when is_binary(B) ->
+ json_encode_string_unicode(xmerl_ucs:from_utf8(B), [?Q]);
+json_encode_string(S, #encoder{input_encoding=utf8}) ->
+ json_encode_string_utf8(S, [?Q]);
+json_encode_string(S, #encoder{input_encoding=unicode}) ->
+ json_encode_string_unicode(S, [?Q]).
+
+json_encode_string_utf8([], Acc) ->
+ lists:reverse([$\" | Acc]);
+json_encode_string_utf8(All=[C | Cs], Acc) ->
+ case C of
+ C when C >= 16#7f ->
+ json_encode_string_unicode(xmerl_ucs:from_utf8(All), Acc);
+ _ ->
+ Acc1 = case C of
+ ?Q ->
+ [?Q, $\\ | Acc];
+ $/ ->
+ [$/, $\\ | Acc];
+ $\\ ->
+ [$\\, $\\ | Acc];
+ $\b ->
+ [$b, $\\ | Acc];
+ $\f ->
+ [$f, $\\ | Acc];
+ $\n ->
+ [$n, $\\ | Acc];
+ $\r ->
+ [$r, $\\ | Acc];
+ $\t ->
+ [$t, $\\ | Acc];
+ C when C >= 0, C < $\s ->
+ [unihex(C) | Acc];
+ C when C >= $\s ->
+ [C | Acc];
+ _ ->
+ exit({json_encode, {bad_char, C}})
+ end,
+ json_encode_string_utf8(Cs, Acc1)
+ end.
+
+json_encode_string_unicode([], Acc) ->
+ lists:reverse([$\" | Acc]);
+json_encode_string_unicode([C | Cs], Acc) ->
+ Acc1 = case C of
+ ?Q ->
+ [?Q, $\\ | Acc];
+ $/ ->
+ [$/, $\\ | Acc];
+ $\\ ->
+ [$\\, $\\ | Acc];
+ $\b ->
+ [$b, $\\ | Acc];
+ $\f ->
+ [$f, $\\ | Acc];
+ $\n ->
+ [$n, $\\ | Acc];
+ $\r ->
+ [$r, $\\ | Acc];
+ $\t ->
+ [$t, $\\ | Acc];
+ C when C >= 0, C < $\s; C >= 16#7f, C =< 16#10FFFF ->
+ [unihex(C) | Acc];
+ C when C < 16#7f ->
+ [C | Acc];
+ _ ->
+ exit({json_encode, {bad_char, C}})
+ end,
+ json_encode_string_unicode(Cs, Acc1).
+
+dehex(C) when C >= $0, C =< $9 ->
+ C - $0;
+dehex(C) when C >= $a, C =< $f ->
+ C - $a + 10;
+dehex(C) when C >= $A, C =< $F ->
+ C - $A + 10.
+
+hexdigit(C) when C >= 0, C =< 9 ->
+ C + $0;
+hexdigit(C) when C =< 15 ->
+ C + $a - 10.
+
+unihex(C) when C < 16#10000 ->
+ <<D3:4, D2:4, D1:4, D0:4>> = <<C:16>>,
+ Digits = [hexdigit(D) || D <- [D3, D2, D1, D0]],
+ [$\\, $u | Digits];
+unihex(C) when C =< 16#10FFFF ->
+ N = C - 16#10000,
+ S1 = 16#d800 bor ((N bsr 10) band 16#3ff),
+ S2 = 16#dc00 bor (N band 16#3ff),
+ [unihex(S1), unihex(S2)].
+
+json_decode(B, S) when is_binary(B) ->
+ json_decode([B], S);
+json_decode(L, S) ->
+ {Res, L1, S1} = decode1(L, S),
+ {eof, [], _} = tokenize(L1, S1#decoder{state=trim}),
+ Res.
+
+decode1(L, S=#decoder{state=null}) ->
+ case tokenize(L, S#decoder{state=any}) of
+ {{const, C}, L1, S1} ->
+ {C, L1, S1};
+ {start_array, L1, S1} ->
+ decode_array(L1, S1#decoder{state=any}, []);
+ {start_object, L1, S1} ->
+ decode_object(L1, S1#decoder{state=key}, [])
+ end.
+
+make_object(V, #decoder{object_hook=null}) ->
+ V;
+make_object(V, #decoder{object_hook=Hook}) ->
+ Hook(V).
+
+decode_object(L, S=#decoder{state=key}, Acc) ->
+ case tokenize(L, S) of
+ {end_object, Rest, S1} ->
+ V = make_object({obj, lists:reverse(Acc)}, S1),
+ {V, Rest, S1#decoder{state=null}};
+ {{const, K}, Rest, S1} when is_list(K) ->
+ {colon, L2, S2} = tokenize(Rest, S1),
+ {V, L3, S3} = decode1(L2, S2#decoder{state=null}),
+ decode_object(L3, S3#decoder{state=comma}, [{K, V} | Acc])
+ end;
+decode_object(L, S=#decoder{state=comma}, Acc) ->
+ case tokenize(L, S) of
+ {end_object, Rest, S1} ->
+ V = make_object({obj, lists:reverse(Acc)}, S1),
+ {V, Rest, S1#decoder{state=null}};
+ {comma, Rest, S1} ->
+ decode_object(Rest, S1#decoder{state=key}, Acc)
+ end.
+
+decode_array(L, S=#decoder{state=any}, Acc) ->
+ case tokenize(L, S) of
+ {end_array, Rest, S1} ->
+ {list_to_tuple(lists:reverse(Acc)), Rest, S1#decoder{state=null}};
+ {start_array, Rest, S1} ->
+ {Array, Rest1, S2} = decode_array(Rest, S1#decoder{state=any}, []),
+ decode_array(Rest1, S2#decoder{state=comma}, [Array | Acc]);
+ {start_object, Rest, S1} ->
+ {Array, Rest1, S2} = decode_object(Rest, S1#decoder{state=key}, []),
+ decode_array(Rest1, S2#decoder{state=comma}, [Array | Acc]);
+ {{const, Const}, Rest, S1} ->
+ decode_array(Rest, S1#decoder{state=comma}, [Const | Acc])
+ end;
+decode_array(L, S=#decoder{state=comma}, Acc) ->
+ case tokenize(L, S) of
+ {end_array, Rest, S1} ->
+ {list_to_tuple(lists:reverse(Acc)), Rest, S1#decoder{state=null}};
+ {comma, Rest, S1} ->
+ decode_array(Rest, S1#decoder{state=any}, Acc)
+ end.
+
+tokenize_string(IoList=[C | _], S=#decoder{input_encoding=utf8}, Acc)
+ when is_list(C); is_binary(C); C >= 16#7f ->
+ List = xmerl_ucs:from_utf8(list_to_binary(lists:flatten(IoList))),
+ tokenize_string(List, S#decoder{input_encoding=unicode}, Acc);
+tokenize_string("\"" ++ Rest, S, Acc) ->
+ {lists:reverse(Acc), Rest, ?INC_COL(S)};
+tokenize_string("\\\"" ++ Rest, S, Acc) ->
+ tokenize_string(Rest, ?ADV_COL(S, 2), [$\" | Acc]);
+tokenize_string("\\\\" ++ Rest, S, Acc) ->
+ tokenize_string(Rest, ?ADV_COL(S, 2), [$\\ | Acc]);
+tokenize_string("\\/" ++ Rest, S, Acc) ->
+ tokenize_string(Rest, ?ADV_COL(S, 2), [$/ | Acc]);
+tokenize_string("\\b" ++ Rest, S, Acc) ->
+ tokenize_string(Rest, ?ADV_COL(S, 2), [$\b | Acc]);
+tokenize_string("\\f" ++ Rest, S, Acc) ->
+ tokenize_string(Rest, ?ADV_COL(S, 2), [$\\ | Acc]);
+tokenize_string("\\n" ++ Rest, S, Acc) ->
+ tokenize_string(Rest, ?ADV_COL(S, 2), [$\n | Acc]);
+tokenize_string("\\r" ++ Rest, S, Acc) ->
+ tokenize_string(Rest, ?ADV_COL(S, 2), [$\r | Acc]);
+tokenize_string("\\t" ++ Rest, S, Acc) ->
+ tokenize_string(Rest, ?ADV_COL(S, 2), [$\t | Acc]);
+tokenize_string([$\\, $u, C3, C2, C1, C0 | Rest], S, Acc) ->
+ % coalesce UTF-16 surrogate pair?
+ C = dehex(C0) bor
+ (dehex(C1) bsl 4) bor
+ (dehex(C2) bsl 8) bor
+ (dehex(C3) bsl 12),
+ tokenize_string(Rest, ?ADV_COL(S, 6), [C | Acc]);
+tokenize_string([C | Rest], S, Acc) when C >= $\s; C < 16#10FFFF ->
+ tokenize_string(Rest, ?ADV_COL(S, 1), [C | Acc]).
+
+tokenize_number(IoList=[C | _], Mode, S=#decoder{input_encoding=utf8}, Acc)
+ when is_list(C); is_binary(C); C >= 16#7f ->
+ List = xmerl_ucs:from_utf8(list_to_binary(lists:flatten(IoList))),
+ tokenize_number(List, Mode, S#decoder{input_encoding=unicode}, Acc);
+tokenize_number([$- | Rest], sign, S, []) ->
+ tokenize_number(Rest, int, ?INC_COL(S), [$-]);
+tokenize_number(Rest, sign, S, []) ->
+ tokenize_number(Rest, int, S, []);
+tokenize_number([$0 | Rest], int, S, Acc) ->
+ tokenize_number(Rest, frac, ?INC_COL(S), [$0 | Acc]);
+tokenize_number([C | Rest], int, S, Acc) when C >= $1, C =< $9 ->
+ tokenize_number(Rest, int1, ?INC_COL(S), [C | Acc]);
+tokenize_number([C | Rest], int1, S, Acc) when C >= $0, C =< $9 ->
+ tokenize_number(Rest, int1, ?INC_COL(S), [C | Acc]);
+tokenize_number(Rest, int1, S, Acc) ->
+ tokenize_number(Rest, frac, S, Acc);
+tokenize_number([$., C | Rest], frac, S, Acc) when C >= $0, C =< $9 ->
+ tokenize_number(Rest, frac1, ?ADV_COL(S, 2), [C, $. | Acc]);
+tokenize_number([E | Rest], frac, S, Acc) when E == $e; E == $E ->
+ tokenize_number(Rest, esign, ?INC_COL(S), [$e, $0, $. | Acc]);
+tokenize_number(Rest, frac, S, Acc) ->
+ {{int, lists:reverse(Acc)}, Rest, S};
+tokenize_number([C | Rest], frac1, S, Acc) when C >= $0, C =< $9 ->
+ tokenize_number(Rest, frac1, ?INC_COL(S), [C | Acc]);
+tokenize_number([E | Rest], frac1, S, Acc) when E == $e; E == $E ->
+ tokenize_number(Rest, esign, ?INC_COL(S), [$e | Acc]);
+tokenize_number(Rest, frac1, S, Acc) ->
+ {{float, lists:reverse(Acc)}, Rest, S};
+tokenize_number([C | Rest], esign, S, Acc) when C == $-; C == $+ ->
+ tokenize_number(Rest, eint, ?INC_COL(S), [C | Acc]);
+tokenize_number(Rest, esign, S, Acc) ->
+ tokenize_number(Rest, eint, S, Acc);
+tokenize_number([C | Rest], eint, S, Acc) when C >= $0, C =< $9 ->
+ tokenize_number(Rest, eint1, ?INC_COL(S), [C | Acc]);
+tokenize_number([C | Rest], eint1, S, Acc) when C >= $0, C =< $9 ->
+ tokenize_number(Rest, eint1, ?INC_COL(S), [C | Acc]);
+tokenize_number(Rest, eint1, S, Acc) ->
+ {{float, lists:reverse(Acc)}, Rest, S}.
+
+tokenize([], S=#decoder{state=trim}) ->
+ {eof, [], S};
+tokenize([L | Rest], S) when is_list(L) ->
+ tokenize(L ++ Rest, S);
+tokenize([B | Rest], S) when is_binary(B) ->
+ tokenize(xmerl_ucs:from_utf8(B) ++ Rest, S);
+tokenize("\r\n" ++ Rest, S) ->
+ tokenize(Rest, ?INC_LINE(S));
+tokenize("\n" ++ Rest, S) ->
+ tokenize(Rest, ?INC_LINE(S));
+tokenize([C | Rest], S) when C == $\s; C == $\t ->
+ tokenize(Rest, ?INC_COL(S));
+tokenize("{" ++ Rest, S) ->
+ {start_object, Rest, ?INC_COL(S)};
+tokenize("}" ++ Rest, S) ->
+ {end_object, Rest, ?INC_COL(S)};
+tokenize("[" ++ Rest, S) ->
+ {start_array, Rest, ?INC_COL(S)};
+tokenize("]" ++ Rest, S) ->
+ {end_array, Rest, ?INC_COL(S)};
+tokenize("," ++ Rest, S) ->
+ {comma, Rest, ?INC_COL(S)};
+tokenize(":" ++ Rest, S) ->
+ {colon, Rest, ?INC_COL(S)};
+tokenize("null" ++ Rest, S) ->
+ {{const, null}, Rest, ?ADV_COL(S, 4)};
+tokenize("true" ++ Rest, S) ->
+ {{const, true}, Rest, ?ADV_COL(S, 4)};
+tokenize("false" ++ Rest, S) ->
+ {{const, false}, Rest, ?ADV_COL(S, 5)};
+tokenize("\"" ++ Rest, S) ->
+ {String, Rest1, S1} = tokenize_string(Rest, ?INC_COL(S), []),
+ {{const, xmerl_ucs:to_utf8(String)}, Rest1, S1};
+tokenize(L=[C | _], S) when C >= $0, C =< $9; C == $- ->
+ case tokenize_number(L, sign, S, []) of
+ {{int, Int}, Rest, S1} ->
+ {{const, list_to_integer(Int)}, Rest, S1};
+ {{float, Float}, Rest, S1} ->
+ {{const, list_to_float(Float)}, Rest, S1}
+ end.
+
+%% testing constructs borrowed from the Yaws JSON implementation.
+
+%% Create an object from a list of Key/Value pairs.
+
+obj_new() ->
+ {obj, []}.
+
+is_obj({obj, Props}) ->
+ F = fun ({K, _}) when is_list(K) ->
+ true;
+ (_) ->
+ false
+ end,
+ lists:all(F, Props).
+
+obj_from_list(Props) ->
+ Obj = {obj, Props},
+ case is_obj(Obj) of
+ true -> Obj;
+ false -> exit(json_bad_object)
+ end.
+
+%% Test for equivalence of Erlang terms.
+%% Due to arbitrary order of construction, equivalent objects might
+%% compare unequal as erlang terms, so we need to carefully recurse
+%% through aggregates (tuples and objects).
+
+equiv({obj, Props1}, {obj, Props2}) ->
+ equiv_object(Props1, Props2);
+equiv(T1, T2) when is_tuple(T1), is_tuple(T2) ->
+ equiv_list(tuple_to_list(T1), tuple_to_list(T2));
+equiv(N1, N2) when is_number(N1), is_number(N2) -> N1 == N2;
+equiv(S1, S2) when is_list(S1), is_list(S2) -> S1 == S2;
+equiv(true, true) -> true;
+equiv(false, false) -> true;
+equiv(null, null) -> true.
+
+%% Object representation and traversal order is unknown.
+%% Use the sledgehammer and sort property lists.
+
+equiv_object(Props1, Props2) ->
+ L1 = lists:keysort(1, Props1),
+ L2 = lists:keysort(1, Props2),
+ Pairs = lists:zip(L1, L2),
+ true = lists:all(fun({{K1, V1}, {K2, V2}}) ->
+ equiv(K1, K2) and equiv(V1, V2)
+ end, Pairs).
+
+%% Recursively compare tuple elements for equivalence.
+
+equiv_list([], []) ->
+ true;
+equiv_list([V1 | L1], [V2 | L2]) ->
+ case equiv(V1, V2) of
+ true ->
+ equiv_list(L1, L2);
+ false ->
+ false
+ end.
+
+test_all() ->
+ test_one(e2j_test_vec(utf8), 1).
+
+test_one([], N) ->
+ io:format("~p tests passed~n", [N-1]),
+ ok;
+test_one([{E, J} | Rest], N) ->
+ io:format("[~p] ~p ~p~n", [N, E, J]),
+ true = equiv(E, decode(J)),
+ true = equiv(E, decode(encode(E))),
+ test_one(Rest, 1+N).
+
+e2j_test_vec(unicode) ->
+ [
+ {"foo" ++ [500] ++ "bar", [$", $f, $o, $o, 500, $b, $a, $r, $"]}
+ ];
+e2j_test_vec(utf8) ->
+ [
+ {1, "1"},
+ {3.1416, "3.14160"}, % text representation may truncate, trail zeroes
+ {-1, "-1"},
+ {-3.1416, "-3.14160"},
+ {12.0e10, "1.20000e+11"},
+ {1.234E+10, "1.23400e+10"},
+ {-1.234E-10, "-1.23400e-10"},
+ {10.0, "1.0e+01"},
+ {123.456, "1.23456E+2"},
+ {10.0, "1e1"},
+ {"foo", "\"foo\""},
+ {"foo" ++ [5] ++ "bar", "\"foo\\u0005bar\""},
+ {"", "\"\""},
+ {[], "\"\""},
+ {"\n\n\n", "\"\\n\\n\\n\""},
+ {obj_new(), "{}"},
+ {obj_from_list([{"foo", "bar"}]), "{\"foo\":\"bar\"}"},
+ {obj_from_list([{"foo", "bar"}, {"baz", 123}]),
+ "{\"foo\":\"bar\",\"baz\":123}"},
+ {{}, "[]"},
+ {{{}}, "[[]]"},
+ {{1, "foo"}, "[1,\"foo\"]"},
+
+ % json array in a json object
+ {obj_from_list([{"foo", {123}}]),
+ "{\"foo\":[123]}"},
+
+ % json object in a json object
+ {obj_from_list([{"foo", obj_from_list([{"bar", true}])}]),
+ "{\"foo\":{\"bar\":true}}"},
+
+ % fold evaluation order
+ {obj_from_list([{"foo", {}},
+ {"bar", obj_from_list([{"baz", true}])},
+ {"alice", "bob"}]),
+ "{\"foo\":[],\"bar\":{\"baz\":true},\"alice\":\"bob\"}"},
+
+ % json object in a json array
+ {{-123, "foo", obj_from_list([{"bar", {}}]), null},
+ "[-123,\"foo\",{\"bar\":[]},null]"}
+ ].
diff --git a/src/couchdb/couch.app.tpl.in b/src/couchdb/couch.app.tpl.in
new file mode 100644
index 00000000..5ddf0989
--- /dev/null
+++ b/src/couchdb/couch.app.tpl.in
@@ -0,0 +1,29 @@
+{application,couch,
+ [{description,"@package_name@"},
+ {vsn,"@version@"},
+ {modules,[couch_btree,
+ cjson,
+ couch_db,
+ couch_doc,
+ couch_query_servers,
+ couch_file,
+ couch_server,
+ couch_server_sup,
+ couch_stream,
+ couch_key_tree,
+ couch_view,
+ couch_util,
+ mod_couch,
+ couch_event_sup,
+ couch_db_update_notifier,
+ couch_ft_query,
+ couch_log,
+ couch_rep]},
+ {registered,[couch_server,
+ couch_server_sup,
+ couch_util,
+ couch_view,
+ couch_query_servers,
+ couch_ft_query]},
+ {applications,[kernel,stdlib,xmerl,couch_inets]},
+ {mod,{couch_server,[]}}]}.
diff --git a/src/couchdb/couch_btree.erl b/src/couchdb/couch_btree.erl
new file mode 100644
index 00000000..2ae837dd
--- /dev/null
+++ b/src/couchdb/couch_btree.erl
@@ -0,0 +1,590 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_btree).
+
+-export([open/2, open/3, query_modify/4, add_remove/3, foldl/3, foldl/4]).
+-export([foldr/3, foldr/4, fold/4, fold/5, row_count/1]).
+-export([lookup/2, get_state/1, test/1, test/0]).
+
+-define(CHUNK_THRESHOLD, 16#fff).
+
+-record(btree,
+ {fd,
+ root,
+ extract_kv = fun({Key, Value}) -> {Key, Value} end,
+ assemble_kv = fun(Key, Value) -> {Key, Value} end,
+ less = fun(A, B) -> A < B end
+ }).
+
+extract(#btree{extract_kv=Extract}, Value) ->
+ Extract(Value).
+
+assemble(#btree{assemble_kv=Assemble}, Key, Value) ->
+ Assemble(Key, Value).
+
+less(#btree{less=Less}, A, B) ->
+ Less(A, B).
+
+% pass in 'nil' for State if a new Btree.
+open(State, Fd) ->
+ {ok, #btree{root=State, fd=Fd}}.
+
+set_options(Bt, []) ->
+ Bt;
+set_options(Bt, [{split, Extract}|Rest]) ->
+ set_options(Bt#btree{extract_kv=Extract}, Rest);
+set_options(Bt, [{join, Assemble}|Rest]) ->
+ set_options(Bt#btree{assemble_kv=Assemble}, Rest);
+set_options(Bt, [{less, Less}|Rest]) ->
+ set_options(Bt#btree{less=Less}, Rest).
+
+open(State, Fd, Options) ->
+ {ok, set_options(#btree{root=State, fd=Fd}, Options)}.
+
+get_state(#btree{root=Root}) ->
+ Root.
+
+row_count(#btree{root=nil}) ->
+ 0;
+row_count(#btree{root={_RootPointer, Count}}) ->
+ Count.
+
+foldl(Bt, Fun, Acc) ->
+ fold(Bt, fwd, Fun, Acc).
+
+foldl(Bt, Key, Fun, Acc) ->
+ fold(Bt, Key, fwd, Fun, Acc).
+
+foldr(Bt, Fun, Acc) ->
+ fold(Bt, rev, Fun, Acc).
+
+foldr(Bt, Key, Fun, Acc) ->
+ fold(Bt, Key, rev, Fun, Acc).
+
+% wraps a 2 arity function with the proper 3 arity function
+convert_fun_arity(Fun) when is_function(Fun, 2) ->
+ fun(KV, _Offset, AccIn) -> Fun(KV, AccIn) end;
+convert_fun_arity(Fun) when is_function(Fun, 3) ->
+ Fun. % Already arity 3
+
+fold(Bt, Dir, Fun, Acc) ->
+ {_ContinueFlag, Acc2} = stream_node(Bt, 0, Bt#btree.root, nil, Dir, convert_fun_arity(Fun), Acc),
+ {ok, Acc2}.
+
+fold(Bt, Key, Dir, Fun, Acc) ->
+ {_ContinueFlag, Acc2} = stream_node(Bt, 0, Bt#btree.root, Key, Dir, convert_fun_arity(Fun), Acc),
+ {ok, Acc2}.
+
+add_remove(Bt, InsertKeyValues, RemoveKeys) ->
+ {Result, [], Bt2} = query_modify(Bt, [], InsertKeyValues, RemoveKeys),
+ {Result, Bt2}.
+
+query_modify(Bt, LookupKeys, InsertValues, RemoveKeys) ->
+ #btree{root=Root} = Bt,
+ InsertActions = lists:map(
+ fun(KeyValue) ->
+ {Key, Value} = extract(Bt, KeyValue),
+ {insert, Key, Value}
+ end, InsertValues),
+ RemoveActions = [{remove, Key, nil} || Key <- RemoveKeys],
+ FetchActions = [{fetch, Key, nil} || Key <- LookupKeys],
+ SortFun =
+ fun({OpA, A, _}, {OpB, B, _}) ->
+ case less(Bt, A, B) of
+ true -> true;
+ false ->
+ case less(Bt, B, A) of
+ true -> false;
+ false ->
+ % A and B are equal, sort by op.
+ op_order(OpA) < op_order(OpB)
+ end
+ end
+ end,
+ Actions = lists:sort(SortFun, lists:append([InsertActions, RemoveActions, FetchActions])),
+ {ok, KeyPointers, QueryResults, Bt2} = modify_node(Bt, Root, Actions, []),
+ {ok, NewRoot, Bt3} = complete_root(Bt2, KeyPointers),
+ {ok, QueryResults, Bt3#btree{root=NewRoot}}.
+
+% for ordering different operatations with the same key.
+% fetch < remove < insert
+op_order(fetch) -> 1;
+op_order(remove) -> 2;
+op_order(insert) -> 3.
+
+lookup(#btree{root=Root, less=Less}=Bt, Keys) ->
+ SortedKeys = lists:sort(Less, Keys),
+ {ok, SortedResults} = lookup(Bt, Root, SortedKeys),
+ % We want to return the results in the same order as the keys were input
+ % but we may have changed the order when we sorted. So we need to put the
+ % order back into the results.
+ KeyDict = dict:from_list(SortedResults),
+ [dict:fetch(Key, KeyDict) || Key <- Keys].
+
+lookup(_Bt, nil, Keys) ->
+ {ok, [{Key, not_found} || Key <- Keys]};
+lookup(Bt, {Pointer, _Count}, Keys) ->
+ {NodeType, NodeList} = get_node(Bt, Pointer),
+ case NodeType of
+ kp_node ->
+ lookup_kpnode(Bt, NodeList, Keys, []);
+ kv_node ->
+ lookup_kvnode(Bt, NodeList, Keys, [])
+ end.
+
+
+lookup_kpnode(_Bt, [], Keys, Output) ->
+ {ok, lists:reverse(Output, [{Key, not_found} || Key <- Keys])};
+
+lookup_kpnode(_Bt, _KPs, [], Output) ->
+ {ok, lists:reverse(Output)};
+
+lookup_kpnode(Bt, [{Key, PointerInfo} | RestKPs], LookupKeys, Output) ->
+ % Split the Keys into two lists, queries of values less
+ % than equals, and greater than the current key
+ SplitFun = fun(LookupKey) -> not less(Bt, Key, LookupKey) end,
+ case lists:splitwith(SplitFun, LookupKeys) of
+ {[], GreaterQueries} ->
+ lookup_kpnode(Bt, RestKPs, GreaterQueries, Output);
+ {LessEqQueries, GreaterQueries} ->
+ {ok, Results} = lookup(Bt, PointerInfo, LessEqQueries),
+ lookup_kpnode(Bt, RestKPs, GreaterQueries, lists:reverse(Results, Output))
+ end.
+
+
+
+lookup_kvnode(_Bt, _KVs, [], Output) ->
+ {ok, lists:reverse(Output)};
+lookup_kvnode(_Bt, [], Keys, Output) ->
+ % keys not found
+ {ok, lists:reverse(Output, [{Key, not_found} || Key <- Keys])};
+lookup_kvnode(Bt, [{Key, Value} | RestKVs], [LookupKey | RestLookupKeys], Output) ->
+ case less(Bt, LookupKey, Key) of
+ true ->
+ lookup_kvnode(Bt, [{Key, Value} | RestKVs], RestLookupKeys, [{LookupKey, not_found} | Output]);
+ false ->
+ case less(Bt, Key, LookupKey) of
+ true ->
+ % LookupKey is greater than Key
+ lookup_kvnode(Bt, RestKVs, [LookupKey | RestLookupKeys], Output);
+ false ->
+ % LookupKey is equal to Key
+ lookup_kvnode(Bt, RestKVs, RestLookupKeys, [{LookupKey, {ok, assemble(Bt, LookupKey, Value)}} | Output])
+ end
+ end.
+
+
+complete_root(Bt, []) ->
+ {ok, nil, Bt};
+complete_root(Bt, [{_Key, PointerInfo}])->
+ {ok, PointerInfo, Bt};
+complete_root(Bt, KPs) ->
+ {ok, ResultKeyPointers, Bt2} = write_node(Bt, kp_node, KPs),
+ complete_root(Bt2, ResultKeyPointers).
+
+%%%%%%%%%%%%% The chunkify function sucks! %%%%%%%%%%%%%
+% It is inaccurate as it does not account for compression when blocks are
+% written. Plus with the "case size(term_to_binary(InList)) of" code it's
+% probably really inefficient.
+
+chunkify(_Bt, []) ->
+ [];
+chunkify(Bt, InList) ->
+ case size(term_to_binary(InList)) of
+ Size when Size > ?CHUNK_THRESHOLD ->
+ NumberOfChunksLikely = ((Size div ?CHUNK_THRESHOLD) + 1),
+ ChunkThreshold = Size div NumberOfChunksLikely,
+ chunkify(Bt, InList, ChunkThreshold, [], 0, []);
+ _Else ->
+ [InList]
+ end.
+
+chunkify(_Bt, [], _ChunkThreshold, [], 0, OutputChunks) ->
+ lists:reverse(OutputChunks);
+chunkify(_Bt, [], _ChunkThreshold, OutList, _OutListSize, OutputChunks) ->
+ lists:reverse([lists:reverse(OutList) | OutputChunks]);
+chunkify(Bt, [InElement | RestInList], ChunkThreshold, OutList, OutListSize, OutputChunks) ->
+ case size(term_to_binary(InElement)) of
+ Size when (Size + OutListSize) > ChunkThreshold ->
+ chunkify(Bt, RestInList, ChunkThreshold, [], 0, [lists:reverse([InElement | OutList]) | OutputChunks]);
+ Size ->
+ chunkify(Bt, RestInList, ChunkThreshold, [InElement | OutList], OutListSize + Size, OutputChunks)
+ end.
+
+modify_node(Bt, RootPointerInfo, Actions, QueryOutput) ->
+ case RootPointerInfo of
+ nil ->
+ NodeType = kv_node,
+ NodeList = [];
+ {Pointer, _count} ->
+ {NodeType, NodeList} = get_node(Bt, Pointer)
+ end,
+ case NodeType of
+ kp_node ->
+ {ok, NewNodeList, QueryOutput2, Bt2} = modify_kpnode(Bt, NodeList, Actions, [], QueryOutput);
+ kv_node ->
+ {ok, NewNodeList, QueryOutput2, Bt2} = modify_kvnode(Bt, NodeList, Actions, [], QueryOutput)
+ end,
+ case NewNodeList of
+ [] -> % no nodes remain
+ {ok, [], QueryOutput2, Bt2};
+ NodeList -> % nothing changed
+ {LastKey, _LastValue} = lists:last(NodeList),
+ {ok, [{LastKey, RootPointerInfo}], QueryOutput2, Bt2};
+ _Else2 ->
+ {ok, ResultList, Bt3} = write_node(Bt2, NodeType, NewNodeList),
+ {ok, ResultList, QueryOutput2, Bt3}
+ end.
+
+
+count(kv_node, NodeList) ->
+ length(NodeList);
+count(kp_node, NodeList) ->
+ lists:foldl( fun({_Key, {_Pointer, Count}}, AccCount) ->
+ Count + AccCount
+ end,
+ 0, NodeList).
+
+
+get_node(#btree{fd = Fd}, NodePos) ->
+ {ok, {NodeType, NodeList}} = couch_file:pread_term(Fd, NodePos),
+ case NodeType of
+ kp_node ->
+ % Node pointers always point backward on disk.
+ % Validating this prevents infinite loops should
+ % a disk corruption occur.
+ [throw({error, disk_corruption})
+ || {_Key, {SubNodePos, _Count}}
+ <- NodeList, SubNodePos >= NodePos];
+ kv_node ->
+ ok
+ end,
+ {NodeType, NodeList}.
+
+write_node(Bt, NodeType, NodeList) ->
+ % split up nodes into smaller sizes
+ NodeListList = chunkify(Bt, NodeList),
+ % now write out each chunk and return the KeyPointer pairs for those nodes
+ ResultList = [
+ begin
+ {ok, Pointer} = couch_file:append_term(Bt#btree.fd, {NodeType, ANodeList}),
+ {LastKey, _} = lists:last(ANodeList),
+ {LastKey, {Pointer, count(NodeType, ANodeList)}}
+ end
+ ||
+ ANodeList <- NodeListList
+ ],
+ {ok, ResultList, Bt}.
+
+modify_kpnode(Bt, KPs, [], ResultNode, QueryOutput) ->
+ % processed all queries for the current tree
+ {ok, lists:reverse(ResultNode, KPs), QueryOutput, Bt};
+
+modify_kpnode(Bt, [], Actions, [{_Key, PointerInfo} | ResultNode], QueryOutput) ->
+ {ok, ChildKPs, QueryOutput2, Bt2} = modify_node(Bt, PointerInfo, Actions, QueryOutput),
+ {ok, lists:reverse(ResultNode, ChildKPs), QueryOutput2, Bt2};
+
+modify_kpnode(Bt, [{Key,PointerInfo} | RestKPs], Actions, ResultNode, QueryOutput) ->
+ % Split the actions into two lists, queries of values less
+ % than equals, and greater than the current key
+ SplitFun = fun({_ActionType, ActionKey, _ActionValue}) ->
+ not less(Bt, Key, ActionKey)
+ end,
+ case lists:splitwith(SplitFun, Actions) of
+ {[], GreaterQueries} ->
+ modify_kpnode(Bt, RestKPs, GreaterQueries, [{Key, PointerInfo} | ResultNode], QueryOutput);
+ {LessEqQueries, GreaterQueries} ->
+ {ok, ChildKPs, QueryOutput2, Bt2} = modify_node(Bt, PointerInfo, LessEqQueries, QueryOutput),
+ modify_kpnode(Bt2, RestKPs, GreaterQueries, lists:reverse(ChildKPs, ResultNode), QueryOutput2)
+ end.
+
+modify_kvnode(Bt, KVs, [], ResultNode, QueryOutput) ->
+ {ok, lists:reverse(ResultNode, KVs), QueryOutput, Bt};
+modify_kvnode(Bt, [], [{ActionType, ActionKey, ActionValue} | RestActions], ResultNode, QueryOutput) ->
+ case ActionType of
+ insert ->
+ modify_kvnode(Bt, [], RestActions, [{ActionKey, ActionValue} | ResultNode], QueryOutput);
+ remove ->
+ % just drop the action
+ modify_kvnode(Bt, [], RestActions, ResultNode, QueryOutput);
+ fetch ->
+ % the key/value must not exist in the tree
+ modify_kvnode(Bt, [], RestActions, ResultNode, [{not_found, {ActionKey, nil}} | QueryOutput])
+ end;
+modify_kvnode(Bt, [{Key, Value} | RestKVs], [{ActionType, ActionKey, ActionValue} | RestActions], ResultNode, QueryOutput) ->
+ case less(Bt, ActionKey, Key) of
+ true ->
+ case ActionType of
+ insert ->
+ % ActionKey is less than the Key, so insert
+ modify_kvnode(Bt, [{Key, Value} | RestKVs], RestActions, [{ActionKey, ActionValue} | ResultNode], QueryOutput);
+ remove ->
+ % ActionKey is less than the Key, just drop the action
+ modify_kvnode(Bt, [{Key, Value} | RestKVs], RestActions, ResultNode, QueryOutput);
+ fetch ->
+ % ActionKey is less than the Key, the key/value must not exist in the tree
+ modify_kvnode(Bt, [{Key, Value} | RestKVs], RestActions, ResultNode, [{not_found, {ActionKey, nil}} | QueryOutput])
+ end;
+ false ->
+ case less(Bt, Key, ActionKey) of
+ true ->
+ % ActionKey is greater than Key
+ modify_kvnode(Bt, RestKVs, [{ActionType, ActionKey, ActionValue} | RestActions], [{Key, Value} | ResultNode], QueryOutput);
+ false ->
+ % InsertKey is equal to Key
+ case ActionType of
+ insert ->
+ % ActionKey is less than the Key, so insert
+ modify_kvnode(Bt, RestKVs, RestActions, [{ActionKey, ActionValue} | ResultNode], QueryOutput);
+ remove ->
+ modify_kvnode(Bt, RestKVs, RestActions, ResultNode, QueryOutput);
+ fetch ->
+ % ActionKey is equal to the Key, insert into the QueryOuput, but re-process the node
+ % since an identical action key can follow it.
+ modify_kvnode(Bt, [{Key, Value} | RestKVs], RestActions, ResultNode, [{ok, assemble(Bt, Key, Value)} | QueryOutput])
+ end
+ end
+ end.
+
+adjust_dir(fwd, List) ->
+ List;
+adjust_dir(rev, List) ->
+ lists:reverse(List).
+
+stream_node(Bt, Offset, PointerInfo, nil, Dir, Fun, Acc) ->
+ stream_node(Bt, Offset, PointerInfo, Dir, Fun, Acc);
+stream_node(_Bt, _Offset, nil, _StartKey, _Dir, _Fun, Acc) ->
+ {ok, Acc};
+stream_node(Bt, Offset, {Pointer, _Count}, StartKey, Dir, Fun, Acc) ->
+ {NodeType, NodeList} = get_node(Bt, Pointer),
+ case NodeType of
+ kp_node ->
+ stream_kp_node(Bt, Offset, adjust_dir(Dir, NodeList), StartKey, Dir, Fun, Acc);
+ kv_node ->
+ stream_kv_node(Bt, Offset, adjust_dir(Dir, NodeList), StartKey, Dir, Fun, Acc)
+ end.
+
+stream_node(_Bt, _Offset, nil, _Dir, _Fun, Acc) ->
+ {ok, Acc};
+stream_node(Bt, Offset, {Pointer, _Count}, Dir, Fun, Acc) ->
+ {NodeType, NodeList} = get_node(Bt, Pointer),
+ case NodeType of
+ kp_node ->
+ stream_kp_node(Bt, Offset, adjust_dir(Dir, NodeList), Dir, Fun, Acc);
+ kv_node ->
+ stream_kv_node(Bt, Offset, adjust_dir(Dir, NodeList), Dir, Fun, Acc)
+ end.
+
+stream_kp_node(_Bt, _Offset, [], _Dir, _Fun, Acc) ->
+ {ok, Acc};
+stream_kp_node(Bt, Offset, [{_Key, {Pointer, Count}} | Rest], Dir, Fun, Acc) ->
+ case stream_node(Bt, Offset, {Pointer, Count}, Dir, Fun, Acc) of
+ {ok, Acc2} ->
+ stream_kp_node(Bt, Offset + Count, Rest, Dir, Fun, Acc2);
+ {stop, Acc2} ->
+ {stop, Acc2}
+ end.
+
+drop_nodes(_Bt, Offset, _StartKey, []) ->
+ {Offset, []};
+drop_nodes(Bt, Offset, StartKey, [{NodeKey, {Pointer, Count}} | RestKPs]) ->
+ case less(Bt, NodeKey, StartKey) of
+ true -> drop_nodes(Bt, Offset + Count, StartKey, RestKPs);
+ false -> {Offset, [{NodeKey, {Pointer, Count}} | RestKPs]}
+ end.
+
+stream_kp_node(Bt, Offset, KPs, StartKey, Dir, Fun, Acc) ->
+ {NewOffset, NodesToStream} =
+ case Dir of
+ fwd ->
+ % drop all nodes sorting before the key
+ drop_nodes(Bt, Offset, StartKey, KPs);
+ rev ->
+ % keep all nodes sorting before the key, AND the first node to sort after
+ RevKPs = lists:reverse(KPs),
+ case lists:splitwith(fun({Key, _Pointer}) -> less(Bt, Key, StartKey) end, RevKPs) of
+ {_RevBefore, []} ->
+ % everything sorts before it
+ {Offset, KPs};
+ {RevBefore, [FirstAfter | Drop]} ->
+ {Offset + count(kp_node, Drop), [FirstAfter | lists:reverse(RevBefore)]}
+ end
+ end,
+ case NodesToStream of
+ [] ->
+ {ok, Acc};
+ [{_Key, PointerInfo} | Rest] ->
+ case stream_node(Bt, NewOffset, PointerInfo, StartKey, Dir, Fun, Acc) of
+ {ok, Acc2} ->
+ stream_kp_node(Bt, NewOffset, Rest, Dir, Fun, Acc2);
+ {stop, Acc2} ->
+ {stop, Acc2}
+ end
+ end.
+
+stream_kv_node(_Bt, _Offset, [], _Dir, _Fun, Acc) ->
+ {ok, Acc};
+stream_kv_node(Bt, Offset, [{K, V} | RestKVs], Dir, Fun, Acc) ->
+ case Fun(assemble(Bt, K, V), Offset, Acc) of
+ {ok, Acc2} ->
+ stream_kv_node(Bt, Offset + 1, RestKVs, Dir, Fun, Acc2);
+ {stop, Acc2} ->
+ {stop, Acc2}
+ end.
+
+stream_kv_node(Bt, Offset, KVs, StartKey, Dir, Fun, Acc) ->
+ DropFun =
+ case Dir of
+ fwd ->
+ fun({Key, _}) -> less(Bt, Key, StartKey) end;
+ rev ->
+ fun({Key, _}) -> less(Bt, StartKey, Key) end
+ end,
+ % drop all nodes preceding the key
+ GTEKVs = lists:dropwhile(DropFun, KVs),
+ LenSkipped = length(KVs) - length(GTEKVs),
+ stream_kv_node(Bt, Offset + LenSkipped, GTEKVs, Dir, Fun, Acc).
+
+
+
+
+test()->
+ test(1000).
+
+test(N) ->
+ KeyValues = [{random:uniform(), random:uniform()} || _Seq <- lists:seq(1, N)],
+ test_btree(KeyValues), % randomly distributed
+ Sorted = lists:sort(KeyValues),
+ test_btree(Sorted), % sorted regular
+ test_btree(lists:reverse(Sorted)). % sorted reverse
+
+
+test_btree(KeyValues) ->
+ {ok, Fd} = couch_file:open("foo", [create,overwrite]),
+ {ok, Btree} = open(nil, Fd),
+
+ % first dump in all the values in one go
+ {ok, Btree10} = add_remove(Btree, KeyValues, []),
+
+ ok = test_keys(Btree10, KeyValues),
+
+ % remove everything
+ {ok, Btree20} = test_remove(Btree10, KeyValues),
+
+ % make sure its empty
+ {ok, false} = foldl(Btree20, fun(_X, _Acc) ->
+ {ok, true} % change Acc to 'true'
+ end,
+ false),
+
+ % add everything back one at a time.
+ {ok, Btree30} = test_add(Btree20, KeyValues),
+
+ ok = test_keys(Btree30, KeyValues),
+
+ KeyValuesRev = lists:reverse(KeyValues),
+
+ % remove everything, in reverse order
+ {ok, Btree40} = test_remove(Btree30, KeyValuesRev),
+
+ % make sure its empty
+ {ok, false} = foldl(Btree40, fun(_X, _Acc) ->
+ {ok, true} % change Acc to 'true'
+ end,
+ false),
+
+
+ {A, B} = every_other(KeyValues),
+
+ % add everything back
+ {ok, Btree50} = test_add(Btree40,KeyValues),
+
+ ok = test_keys(Btree50, KeyValues),
+
+ % remove half the values
+ {ok, Btree60} = test_remove(Btree50, A),
+
+ % verify the remaining
+ ok = test_keys(Btree60, B),
+
+ % add A back
+ {ok, Btree70} = test_add(Btree60, A),
+
+ % verify
+ ok = test_keys(Btree70, KeyValues),
+
+ % remove B
+ {ok, Btree80} = test_remove(Btree70, B),
+
+ % verify the remaining
+ ok = test_keys(Btree80, A),
+
+ ok = couch_file:close(Fd).
+
+
+
+
+every_other(List) ->
+ every_other(List, [], [], 1).
+
+every_other([], AccA, AccB, _Flag) ->
+ {lists:reverse(AccA), lists:reverse(AccB)};
+every_other([H|T], AccA, AccB, 1) ->
+ every_other(T, [H|AccA], AccB, 0);
+every_other([H|T], AccA, AccB, 0) ->
+ every_other(T, AccA, [H|AccB], 1).
+
+test_keys(Btree, List) ->
+ FoldFun =
+ fun(Element, [HAcc|TAcc]) ->
+ Element = HAcc, % must match
+ {ok, TAcc}
+ end,
+ Sorted = lists:sort(List),
+ {ok, []} = foldl(Btree, FoldFun, Sorted),
+ {ok, []} = foldr(Btree, FoldFun, lists:reverse(Sorted)),
+
+ test_lookup(Btree, List).
+
+% Makes sure each key value pair is found in the btree
+test_lookup(_Btree, []) ->
+ ok;
+test_lookup(Btree, [{Key, Value} | Rest]) ->
+ [{ok,{Key, Value}}] = lookup(Btree, [Key]),
+ {ok, []} = foldl(Btree, Key, fun({KeyIn, ValueIn}, []) ->
+ KeyIn = Key,
+ ValueIn = Value,
+ {stop, []}
+ end,
+ []),
+ {ok, []} = foldr(Btree, Key, fun({KeyIn, ValueIn}, []) ->
+ KeyIn = Key,
+ ValueIn = Value,
+ {stop, []}
+ end,
+ []),
+ test_lookup(Btree, Rest).
+
+% removes each key one at a time from the btree
+test_remove(Btree, []) ->
+ {ok, Btree};
+test_remove(Btree, [{Key, _Value} | Rest]) ->
+ {ok, Btree2} = add_remove(Btree,[], [Key]),
+ test_remove(Btree2, Rest).
+
+% adds each key one at a time from the btree
+test_add(Btree, []) ->
+ {ok, Btree};
+test_add(Btree, [KeyValue | Rest]) ->
+ {ok, Btree2} = add_remove(Btree, [KeyValue], []),
+ test_add(Btree2, Rest).
diff --git a/src/couchdb/couch_db.erl b/src/couchdb/couch_db.erl
new file mode 100644
index 00000000..e567d27b
--- /dev/null
+++ b/src/couchdb/couch_db.erl
@@ -0,0 +1,757 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_db).
+-behaviour(gen_server).
+
+-export([open/2,create/2,create/3,get_doc_info/2]).
+-export([save_docs/2, save_docs/3, get_db_info/1, update_doc/3, update_docs/2, update_docs/3]).
+-export([delete_doc/3,open_doc/2,open_doc/3,close/1,enum_docs_since/4,enum_docs_since/5]).
+-export([enum_docs/4,enum_docs/5, open_doc_revs/4, get_missing_revs/2]).
+-export([start_update_loop/1]).
+-export([init/1,terminate/2,handle_call/3,handle_cast/2,code_change/3,handle_info/2]).
+
+-include("couch_db.hrl").
+
+-record(db_header,
+ {write_version = 0,
+ last_update_seq = 0,
+ summary_stream_state = nil,
+ docinfo_by_Id_btree_state = nil,
+ docinfo_by_seq_btree_state = nil,
+ local_docs_btree_state = nil,
+ doc_count=0,
+ doc_del_count=0
+ }).
+
+-record(db,
+ {main_pid,
+ update_pid,
+ fd,
+ header = #db_header{},
+ summary_stream,
+ docinfo_by_Id_btree,
+ docinfo_by_seq_btree,
+ local_docs_btree,
+ last_update_seq,
+ doc_count,
+ doc_del_count,
+ name
+ }).
+
+start_link(DbName, Filepath, Options) ->
+ case couch_file:open(Filepath, Options) of
+ {ok, Fd} ->
+ Result = gen_server:start_link(couch_db, {DbName, Fd, Options}, []),
+ unlink(Fd),
+ Result;
+ {error, enoent} ->
+ % couldn't find file
+ {error, not_found};
+ Else ->
+ Else
+ end.
+
+%%% Interface functions %%%
+
+create(Filepath, Options) ->
+ create(Filepath, Filepath, Options).
+
+create(DbName, Filepath, Options) when is_list(Options) ->
+ start_link(DbName, Filepath, [create | Options]).
+
+open(DbName, Filepath) ->
+ start_link(DbName, Filepath, []).
+
+delete_doc(MainPid, Id, Revisions) ->
+ DeletedDocs = [#doc{id=Id, revs=[Rev], deleted=true} || Rev <- Revisions],
+ {ok, [Result]} = update_docs(MainPid, DeletedDocs, [new_edits]),
+ {ok, Result}.
+
+open_doc(MainPid, IdOrDocInfo) ->
+ open_doc(MainPid, IdOrDocInfo, []).
+
+open_doc(MainPid, Id, Options) ->
+ case open_doc_int(get_db(MainPid), Id, Options) of
+ {ok, #doc{deleted=true}=Doc} ->
+ case lists:member(deleted, Options) of
+ true ->
+ {ok, Doc};
+ false ->
+ {not_found, deleted}
+ end;
+ Else ->
+ Else
+ end.
+
+open_doc_revs(MainPid, Id, Revs, Options) ->
+ open_doc_revs_int(get_db(MainPid), Id, Revs, Options).
+
+get_missing_revs(MainPid, IdRevsList) ->
+ Ids = [Id1 || {Id1, _Revs} <- IdRevsList],
+ FullDocInfoResults = get_full_doc_infos(MainPid, Ids),
+ Results = lists:zipwith(
+ fun({Id, Revs}, FullDocInfoResult) ->
+ case FullDocInfoResult of
+ {ok, #full_doc_info{rev_tree=RevisionTree}} ->
+ {Id, couch_key_tree:find_missing(RevisionTree, Revs)};
+ not_found ->
+ {Id, Revs}
+ end
+ end,
+ IdRevsList, FullDocInfoResults),
+ {ok, Results}.
+
+get_doc_info(Db, Id) ->
+ case get_full_doc_info(Db, Id) of
+ {ok, DocInfo} ->
+ {ok, couch_doc:to_doc_info(DocInfo)};
+ Else ->
+ Else
+ end.
+
+% returns {ok, DocInfo} or not_found
+get_full_doc_info(Db, Id) ->
+ [Result] = get_full_doc_infos(Db, [Id]),
+ Result.
+
+
+get_full_doc_infos(MainPid, Ids) when is_pid(MainPid) ->
+ get_full_doc_infos(get_db(MainPid), Ids);
+get_full_doc_infos(#db{}=Db, Ids) ->
+ couch_btree:lookup(Db#db.docinfo_by_Id_btree, Ids).
+
+get_db_info(MainPid) when is_pid(MainPid) ->
+ get_db_info(get_db(MainPid));
+get_db_info(#db{doc_count=Count, doc_del_count=DelCount, last_update_seq=SeqNum}) ->
+ InfoList = [
+ {doc_count, Count},
+ {doc_del_count, DelCount},
+ {last_update_seq, SeqNum}
+ ],
+ {ok, InfoList}.
+
+update_doc(MainPid, Doc, Options) ->
+ {ok, [NewRev]} = update_docs(MainPid, [Doc], Options),
+ {ok, NewRev}.
+
+update_docs(MainPid, Docs) ->
+ update_docs(MainPid, Docs, []).
+
+% group_alike_docs groups the sorted documents into sublist buckets, by id.
+% ([DocA, DocA, DocB, DocC], []) -> [[DocA, DocA], [DocB], [DocC]]
+group_alike_docs(Docs) ->
+ Sorted = lists:sort(fun(#doc{id=A},#doc{id=B})-> A < B end, Docs),
+ group_alike_docs(Sorted, []).
+
+group_alike_docs([], Buckets) ->
+ lists:reverse(Buckets);
+group_alike_docs([Doc|Rest], []) ->
+ group_alike_docs(Rest, [[Doc]]);
+group_alike_docs([Doc|Rest], [Bucket|RestBuckets]) ->
+ [#doc{id=BucketId}|_] = Bucket,
+ case Doc#doc.id == BucketId of
+ true ->
+ % add to existing bucket
+ group_alike_docs(Rest, [[Doc|Bucket]|RestBuckets]);
+ false ->
+ % add to new bucket
+ group_alike_docs(Rest, [[Doc]|[Bucket|RestBuckets]])
+ end.
+
+
+prepare_doc_for_new_edit(Db, #doc{id=Id,revs=[NewRev|PrevRevs]}=Doc, OldFullDocInfo, LeafRevsDict) ->
+ case PrevRevs of
+ [PrevRev|_] ->
+ case dict:find(PrevRev, LeafRevsDict) of
+ {ok, {Deleted, Sp, DiskRevs}} ->
+ case couch_doc:has_stubs(Doc) of
+ true ->
+ DiskDoc = make_doc(Db, Id, Deleted, Sp, DiskRevs),
+ Doc2 = couch_doc:merge_stubs(Doc, DiskDoc),
+ Doc2#doc{revs=[NewRev|DiskRevs]};
+ false ->
+ Doc#doc{revs=[NewRev|DiskRevs]}
+ end;
+ error ->
+ throw(conflict)
+ end;
+ [] ->
+ % new doc, and we have existing revs.
+ OldDocInfo = couch_doc:to_doc_info(OldFullDocInfo),
+ if OldDocInfo#doc_info.deleted ->
+ % existing doc is a deleton
+ % allow this new doc to be a later revision.
+ {_Deleted, _Sp, Revs} = dict:fetch(OldDocInfo#doc_info.rev, LeafRevsDict),
+ Doc#doc{revs=[NewRev|Revs]};
+ true ->
+ throw(conflict)
+ end
+ end.
+
+update_docs(MainPid, Docs, Options) ->
+ Docs2 = lists:map(
+ fun(#doc{id=Id,revs=Revs}=Doc) ->
+ case Id of
+ ?LOCAL_DOC_PREFIX ++ _ ->
+ Rev = case Revs of [] -> 0; [Rev0|_] -> list_to_integer(Rev0) end,
+ Doc#doc{revs=[integer_to_list(Rev + 1)]};
+ _ ->
+ Doc#doc{revs=[integer_to_list(couch_util:rand32()) | Revs]}
+ end
+ end, Docs),
+ DocBuckets = group_alike_docs(Docs2),
+ Ids = [Id || [#doc{id=Id}|_] <- DocBuckets],
+ Db = get_db(MainPid),
+
+ % first things first, lookup the doc by id and get the most recent
+
+ ExistingDocs = get_full_doc_infos(Db, Ids),
+
+ DocBuckets2 = lists:zipwith(
+ fun(Bucket, not_found) ->
+ % no existing revs, make sure no old revision is specified.
+ [throw(conflict) || #doc{revs=[_NewRev, _OldRev | _]} <- Bucket],
+ Bucket;
+ (Bucket, {ok, #full_doc_info{rev_tree=OldRevTree}=OldFullDocInfo}) ->
+ Leafs = couch_key_tree:get_all_leafs(OldRevTree),
+ LeafRevsDict = dict:from_list([{Rev, {Deleted, Sp, Revs}} || {Rev, {Deleted, Sp}, Revs} <- Leafs]),
+ [prepare_doc_for_new_edit(Db, Doc, OldFullDocInfo, LeafRevsDict) || Doc <- Bucket]
+ end,
+ DocBuckets, ExistingDocs),
+
+ % flush unwritten binaries to disk.
+ DocBuckets3 = [[doc_flush_binaries(Doc, Db#db.fd) || Doc <- Bucket] || Bucket <- DocBuckets2],
+
+ case gen_server:call(MainPid, {update_docs, DocBuckets3, Options}) of
+ ok ->
+ % return back the new rev ids, in the same order input.
+ {ok, [NewRev || #doc{revs=[NewRev|_]} <- Docs2]};
+ Else->
+ throw(Else)
+ end.
+
+save_docs(MainPid, Docs) ->
+ save_docs(MainPid, Docs, []).
+
+save_docs(MainPid, Docs, Options) ->
+ % flush unwritten binaries to disk.
+ Db = get_db(MainPid),
+ DocBuckets = group_alike_docs(Docs),
+ DocBuckets2 = [[doc_flush_binaries(Doc, Db#db.fd) || Doc <- Bucket] || Bucket <- DocBuckets],
+ ok = gen_server:call(MainPid, {update_docs, DocBuckets2, Options}).
+
+
+doc_flush_binaries(Doc, Fd) ->
+ % calc size of binaries to write out
+ Bins = Doc#doc.attachments,
+ PreAllocSize = lists:foldl(
+ fun(BinValue, SizeAcc) ->
+ case BinValue of
+ {_Key, {_Type, {Fd0, _StreamPointer, _Len}}} when Fd0 == Fd ->
+ % already written to our file, nothing to write
+ SizeAcc;
+ {_Key, {_Type, {_OtherFd, _StreamPointer, Len}}} ->
+ % written to a different file
+ SizeAcc + Len;
+ {_Key, {_Type, Bin}} when is_binary(Bin) ->
+ SizeAcc + size(Bin)
+ end
+ end,
+ 0, Bins),
+
+ {ok, OutputStream} = couch_stream:open(Fd),
+ ok = couch_stream:ensure_buffer(OutputStream, PreAllocSize),
+
+ NewBins = lists:map(
+ fun({Key, {Type, BinValue}}) ->
+ NewBinValue =
+ case BinValue of
+ {Fd0, StreamPointer, Len} when Fd0 == Fd ->
+ % already written to our file, nothing to write
+ {Fd, StreamPointer, Len};
+ {OtherFd, StreamPointer, Len} ->
+ % written to a different file (or a closed file
+ % instance, which will cause an error)
+ {ok, {NewStreamPointer, Len}, _EndSp} =
+ couch_stream:foldl(OtherFd, StreamPointer, Len,
+ fun(Bin, {BeginPointer, SizeAcc}) ->
+ {ok, Pointer} = couch_stream:write(OutputStream, Bin),
+ case SizeAcc of
+ 0 -> % this was the first write, record the pointer
+ {ok, {Pointer, size(Bin)}};
+ _ ->
+ {ok, {BeginPointer, SizeAcc + size(Bin)}}
+ end
+ end,
+ {{0,0}, 0}),
+ {Fd, NewStreamPointer, Len};
+ Bin when is_binary(Bin), size(Bin) > 0 ->
+ {ok, StreamPointer} = couch_stream:write(OutputStream, Bin),
+ {Fd, StreamPointer, size(Bin)}
+ end,
+ {Key, {Type, NewBinValue}}
+ end, Bins),
+
+ {ok, _FinalPos} = couch_stream:close(OutputStream),
+
+ Doc#doc{attachments = NewBins}.
+
+enum_docs_since(MainPid, SinceSeq, Direction, InFun, Ctx) ->
+ Db = get_db(MainPid),
+ couch_btree:fold(Db#db.docinfo_by_seq_btree, SinceSeq + 1, Direction, InFun, Ctx).
+
+enum_docs_since(MainPid, SinceSeq, InFun, Acc) ->
+ enum_docs_since(MainPid, SinceSeq, fwd, InFun, Acc).
+
+enum_docs(MainPid, StartId, Direction, InFun, InAcc) ->
+ Db = get_db(MainPid),
+ couch_btree:fold(Db#db.docinfo_by_Id_btree, StartId, Direction, InFun, InAcc).
+
+enum_docs(MainPid, StartId, InFun, Ctx) ->
+ enum_docs(MainPid, StartId, fwd, InFun, Ctx).
+
+close(MainPid) ->
+ Ref = erlang:monitor(process, MainPid),
+ unlink(MainPid),
+ exit(MainPid, normal),
+ receive
+ {'DOWN', Ref, process, MainPid, _Reason} ->
+ ok
+ end.
+
+
+% server functions
+
+init({DbName, Fd, Options}) ->
+ link(Fd),
+ case lists:member(create, Options) of
+ true ->
+ % create a new header and writes it to the file
+ Header = #db_header{},
+ ok = couch_file:write_header(Fd, <<$g, $m, $k, 0>>, Header),
+ ok = couch_file:sync(Fd),
+ init_main(DbName, Fd, Header);
+ false ->
+ {ok, Header} = couch_file:read_header(Fd, <<$g, $m, $k, 0>>),
+ init_main(DbName, Fd, Header)
+ end.
+
+btree_by_seq_split(DocInfo) ->
+ #doc_info{
+ id = Id,
+ rev = Rev,
+ update_seq = Seq,
+ summary_pointer = Sp,
+ conflict_revs = Conflicts,
+ deleted_conflict_revs = DelConflicts,
+ deleted = Deleted} = DocInfo,
+ {Seq,{Id, Rev, Sp, Conflicts, DelConflicts, Deleted}}.
+
+btree_by_seq_join(Seq,{Id, Rev, Sp, Conflicts, DelConflicts, Deleted}) ->
+ #doc_info{
+ id = Id,
+ rev = Rev,
+ update_seq = Seq,
+ summary_pointer = Sp,
+ conflict_revs = Conflicts,
+ deleted_conflict_revs = DelConflicts,
+ deleted = Deleted}.
+
+btree_by_name_split(#full_doc_info{id=Id, update_seq=Seq, rev_tree=Tree}) ->
+ {Id, {Seq, Tree}}.
+
+btree_by_name_join(Id, {Seq, Tree}) ->
+ #full_doc_info{id=Id, update_seq=Seq, rev_tree=Tree}.
+
+
+init_main(DbName, Fd, Header) ->
+ {ok, SummaryStream} = couch_stream:open(Header#db_header.summary_stream_state, Fd),
+ ok = couch_stream:set_min_buffer(SummaryStream, 10000),
+ {ok, IdBtree} = couch_btree:open(Header#db_header.docinfo_by_Id_btree_state, Fd,
+ [{split, fun(V) -> btree_by_name_split(V) end},
+ {join, fun(K,V) -> btree_by_name_join(K,V) end}] ),
+ {ok, SeqBtree} = couch_btree:open(Header#db_header.docinfo_by_seq_btree_state, Fd,
+ [{split, fun(V) -> btree_by_seq_split(V) end},
+ {join, fun(K,V) -> btree_by_seq_join(K,V) end}] ),
+ {ok, LocalDocsBtree} = couch_btree:open(Header#db_header.local_docs_btree_state, Fd),
+
+ Db = #db{
+ main_pid=self(),
+ fd=Fd,
+ header=Header,
+ summary_stream = SummaryStream,
+ docinfo_by_Id_btree = IdBtree,
+ docinfo_by_seq_btree = SeqBtree,
+ local_docs_btree = LocalDocsBtree,
+ last_update_seq = Header#db_header.last_update_seq,
+ doc_count = Header#db_header.doc_count,
+ doc_del_count = Header#db_header.doc_del_count,
+ name = DbName
+ },
+
+ UpdatePid = spawn_link(couch_db, start_update_loop, [Db]),
+
+ {ok, Db#db{update_pid=UpdatePid}}.
+
+terminate(_Reason, Db) ->
+ Db#db.update_pid ! close,
+ couch_file:close(Db#db.fd).
+
+handle_call({update_docs, DocActions, Options}, From, #db{update_pid=Updater}=Db) ->
+ Updater ! {From, update_docs, DocActions, Options},
+ {noreply, Db};
+handle_call(get_db, _From, Db) ->
+ {reply, {ok, Db}, Db};
+handle_call({db_updated, NewDb}, _From, _OldDb) ->
+ {reply, ok, NewDb}.
+
+
+handle_cast(foo, Main) ->
+ {noreply, Main}.
+
+%%% Internal function %%%
+
+start_update_loop(Db) ->
+ update_loop(Db#db{update_pid=self()}).
+
+update_loop(Db) ->
+ receive
+ {OrigFrom, update_docs, DocActions, Options} ->
+ case (catch update_docs_int(Db, DocActions, Options)) of
+ {ok, Db2} ->
+ ok = gen_server:call(Db2#db.main_pid, {db_updated, Db2}),
+ gen_server:reply(OrigFrom, ok),
+ couch_db_update_notifier:notify({updated, Db2#db.name}),
+ update_loop(Db2);
+ conflict ->
+ gen_server:reply(OrigFrom, conflict),
+ update_loop(Db);
+ Error ->
+ exit(Error) % we crashed
+ end;
+ close ->
+ % terminate loop
+ exit(normal)
+ end.
+
+get_db(MainPid) ->
+ {ok, Db} = gen_server:call(MainPid, get_db),
+ Db.
+
+open_doc_revs_int(Db, Id, Revs, Options) ->
+ case get_full_doc_info(Db, Id) of
+ {ok, #full_doc_info{rev_tree=RevTree}} ->
+ {FoundRevs, MissingRevs} =
+ case Revs of
+ all ->
+ {couch_key_tree:get_all_leafs(RevTree), []};
+ _ ->
+ case lists:member(latest, Options) of
+ true ->
+ couch_key_tree:get_key_leafs(RevTree, Revs);
+ false ->
+ couch_key_tree:get(RevTree, Revs)
+ end
+ end,
+ FoundResults =
+ lists:map(fun({Rev, Value, FoundRevPath}) ->
+ case Value of
+ 0 ->
+ % we have the rev in our list but know nothing about it
+ {{not_found, missing}, Rev};
+ {IsDeleted, SummaryPtr} ->
+ {ok, make_doc(Db, Id, IsDeleted, SummaryPtr, FoundRevPath)}
+ end
+ end, FoundRevs),
+ Results = FoundResults ++ [{{not_found, missing}, MissingRev} || MissingRev <- MissingRevs],
+ {ok, Results};
+ not_found when Revs == all ->
+ {ok, []};
+ not_found ->
+ {ok, [{{not_found, missing}, Rev} || Rev <- Revs]}
+ end.
+
+open_doc_int(Db, ?LOCAL_DOC_PREFIX ++ _ = Id, _Options) ->
+ case couch_btree:lookup(Db#db.local_docs_btree, [Id]) of
+ [{ok, {_, {Rev, BodyData}}}] ->
+ {ok, #doc{id=Id, revs=[integer_to_list(Rev)], body=BodyData}};
+ [not_found] ->
+ {not_found, missing}
+ end;
+open_doc_int(Db, #doc_info{id=Id,rev=Rev,deleted=IsDeleted,summary_pointer=Sp}=DocInfo, Options) ->
+ Doc = make_doc(Db, Id, IsDeleted, Sp, [Rev]),
+ {ok, Doc#doc{meta=doc_meta_info(DocInfo, [], Options)}};
+open_doc_int(Db, #full_doc_info{id=Id,rev_tree=RevTree}=FullDocInfo, Options) ->
+ #doc_info{deleted=IsDeleted,rev=Rev, summary_pointer=Sp} = DocInfo =
+ couch_doc:to_doc_info(FullDocInfo),
+ {[{_Rev,_Value, Revs}], []} = couch_key_tree:get(RevTree, [Rev]),
+ Doc = make_doc(Db, Id, IsDeleted, Sp, Revs),
+ {ok, Doc#doc{meta=doc_meta_info(DocInfo, RevTree, Options)}};
+open_doc_int(Db, Id, Options) ->
+ case get_full_doc_info(Db, Id) of
+ {ok, FullDocInfo} ->
+ open_doc_int(Db, FullDocInfo, Options);
+ not_found ->
+ throw({not_found, missing})
+ end.
+
+doc_meta_info(DocInfo, RevTree, Options) ->
+ case lists:member(revs_info, Options) of
+ false -> [];
+ true ->
+ {[RevPath],[]} =
+ couch_key_tree:get_full_key_paths(RevTree, [DocInfo#doc_info.rev]),
+ [{revs_info, [{Rev, Deleted} || {Rev, {Deleted, _Sp0}} <- RevPath]}]
+ end ++
+ case lists:member(conflicts, Options) of
+ false -> [];
+ true ->
+ case DocInfo#doc_info.conflict_revs of
+ [] -> [];
+ _ -> [{conflicts, DocInfo#doc_info.conflict_revs}]
+ end
+ end ++
+ case lists:member(deleted_conflicts, Options) of
+ false -> [];
+ true ->
+ case DocInfo#doc_info.deleted_conflict_revs of
+ [] -> [];
+ _ -> [{deleted_conflicts, DocInfo#doc_info.deleted_conflict_revs}]
+ end
+ end.
+
+% rev tree functions
+
+doc_to_tree(Doc) ->
+ doc_to_tree(Doc, lists:reverse(Doc#doc.revs)).
+
+doc_to_tree(Doc, [RevId]) ->
+ [{RevId, Doc, []}];
+doc_to_tree(Doc, [RevId | Rest]) ->
+ [{RevId, [], doc_to_tree(Doc, Rest)}].
+
+make_doc(Db, Id, Deleted, SummaryPointer, RevisionPath) ->
+ {BodyData, BinValues} =
+ case SummaryPointer of
+ nil ->
+ {[], []};
+ _ ->
+ {ok, {BodyData0, BinValues0}} = couch_stream:read_term(Db#db.summary_stream, SummaryPointer),
+ {BodyData0, [{Name, {Type, {Db#db.fd, Sp, Len}}} || {Name, {Type, Sp, Len}} <- BinValues0]}
+ end,
+ #doc{
+ id = Id,
+ revs = RevisionPath,
+ body = BodyData,
+ attachments = BinValues,
+ deleted = Deleted
+ }.
+
+flush_trees(_Db, [], AccFlushedTrees) ->
+ {ok, lists:reverse(AccFlushedTrees)};
+flush_trees(Db, [Unflushed | RestUnflushed], AccFlushed) ->
+ Flushed = couch_key_tree:map(
+ fun(_Rev, Value) ->
+ case Value of
+ #doc{attachments=Atts,deleted=IsDeleted}=Doc ->
+ % this node value is actually an unwritten document summary,
+ % write to disk.
+
+ % convert bins, removing the FD.
+ % All bins should have been flushed to disk already.
+ Bins = [{BinName, {BinType, BinSp, BinLen}} || {BinName, {BinType, {_Fd, BinSp, BinLen}}} <- Atts],
+ {ok, NewSummaryPointer} = couch_stream:write_term(Db#db.summary_stream, {Doc#doc.body, Bins}),
+ {IsDeleted, NewSummaryPointer};
+ _ ->
+ Value
+ end
+ end, Unflushed),
+ flush_trees(Db, RestUnflushed, [Flushed | AccFlushed]).
+
+merge_rev_trees(_NoConflicts, [], [], AccNewTrees) ->
+ {ok, lists:reverse(AccNewTrees)};
+merge_rev_trees(NoConflicts, [NewDocs | RestDocsList],
+ [OldTree | RestOldTrees], AccNewTrees) ->
+ UpdatesRevTree = lists:foldl(
+ fun(NewDoc, AccTree) ->
+ couch_key_tree:merge(AccTree, doc_to_tree(NewDoc))
+ end,
+ [], NewDocs),
+ NewRevTree = couch_key_tree:merge(OldTree, UpdatesRevTree),
+ if NoConflicts andalso OldTree == [] ->
+ OldConflicts = couch_key_tree:count_leafs(OldTree),
+ NewConflicts = couch_key_tree:count_leafs(NewRevTree),
+ if NewConflicts > OldConflicts ->
+ throw(conflict);
+ true -> ok
+ end;
+ true -> ok
+ end,
+ merge_rev_trees(NoConflicts, RestDocsList, RestOldTrees, [NewRevTree | AccNewTrees]).
+
+new_index_entries([], [], Seq, DocCount, DelCount, AccById, AccBySeq) ->
+ {ok, Seq, DocCount, DelCount, AccById, AccBySeq};
+new_index_entries([Id|RestIds], [RevTree|RestTrees], Seq0, DocCount, DelCount, AccById, AccBySeq) ->
+ Seq = Seq0 + 1,
+ FullDocInfo = #full_doc_info{id=Id, update_seq=Seq, rev_tree=RevTree},
+ #doc_info{deleted=Deleted} = DocInfo = couch_doc:to_doc_info(FullDocInfo),
+ {DocCount2, DelCount2} =
+ if Deleted -> {DocCount, DelCount + 1};
+ true -> {DocCount + 1, DelCount}
+ end,
+ new_index_entries(RestIds, RestTrees, Seq, DocCount2, DelCount2, [FullDocInfo|AccById], [DocInfo|AccBySeq]).
+
+update_docs_int(Db, DocsList, Options) ->
+ #db{
+ docinfo_by_Id_btree = DocInfoByIdBTree,
+ docinfo_by_seq_btree = DocInfoBySeqBTree,
+ last_update_seq = LastSeq,
+ doc_count = FullDocCount,
+ doc_del_count = FullDelCount
+ } = Db,
+
+ % separate out the NonRep documents from the rest of the documents
+ {DocsList2, NonRepDocs} = lists:foldl(
+ fun([#doc{id=Id}=Doc | Rest]=Docs, {DocsListAcc, NonRepDocsAcc}) ->
+ case Id of
+ ?LOCAL_DOC_PREFIX ++ _ when Rest==[] ->
+ % when saving NR (non rep) documents, you can only save a single rev
+ {DocsListAcc, [Doc | NonRepDocsAcc]};
+ Id->
+ {[Docs | DocsListAcc], NonRepDocsAcc}
+ end
+ end, {[], []}, DocsList),
+
+ Ids = [Id || [#doc{id=Id}|_] <- DocsList2],
+
+ % lookup up the existing documents, if they exist.
+ OldDocLookups = couch_btree:lookup(DocInfoByIdBTree, Ids),
+ OldDocTrees = lists:map(
+ fun({ok, #full_doc_info{rev_tree=OldRevTree}}) ->
+ OldRevTree;
+ (not_found) ->
+ []
+ end,
+ OldDocLookups),
+
+ {OldCount, OldDelCount} = lists:foldl(
+ fun({ok, FullDocInfo}, {OldCountAcc, OldDelCountAcc}) ->
+ case couch_doc:to_doc_info(FullDocInfo) of
+ #doc_info{deleted=false} ->
+ {OldCountAcc + 1, OldDelCountAcc};
+ _ ->
+ {OldCountAcc , OldDelCountAcc + 1}
+ end;
+ (not_found, Acc) ->
+ Acc
+ end, {0, 0}, OldDocLookups),
+
+ % Merge the new docs into the revision trees.
+ NoConflicts = lists:member(no_conflicts, Options),
+ {ok, NewRevTrees} = merge_rev_trees(NoConflicts, DocsList2, OldDocTrees, []),
+
+ RemoveSeqs = [ OldSeq || {ok, #full_doc_info{update_seq=OldSeq}} <- OldDocLookups],
+
+ % All regular documents are now ready to write.
+
+ % Try to write the local documents first, a conflict might be generated
+ {ok, Db2} = update_local_docs(Db, NonRepDocs),
+
+ % Write out the documents summaries (they are stored in the nodes of the rev trees)
+ {ok, FlushedRevTrees} = flush_trees(Db2, NewRevTrees, []),
+
+ {ok, NewSeq, NewDocsCount, NewDelCount, InfoById, InfoBySeq} =
+ new_index_entries(Ids, FlushedRevTrees, LastSeq, 0, 0, [], []),
+
+ % and the indexes to the documents
+ {ok, DocInfoBySeqBTree2} = couch_btree:add_remove(DocInfoBySeqBTree, InfoBySeq, RemoveSeqs),
+ {ok, DocInfoByIdBTree2} = couch_btree:add_remove(DocInfoByIdBTree, InfoById, []),
+
+ Db3 = Db2#db{
+ docinfo_by_Id_btree = DocInfoByIdBTree2,
+ docinfo_by_seq_btree = DocInfoBySeqBTree2,
+ last_update_seq = NewSeq,
+ doc_count = FullDocCount + NewDocsCount - OldCount,
+ doc_del_count = FullDelCount + NewDelCount - OldDelCount
+ },
+
+ case lists:member(delay_commit, Options) of
+ true ->
+ {ok, Db3};
+ false ->
+ commit_outstanding(Db3)
+ end.
+
+update_local_docs(#db{local_docs_btree=Btree}=Db, Docs) ->
+ Ids = [Id || #doc{id=Id} <- Docs],
+ OldDocLookups = couch_btree:lookup(Btree, Ids),
+ BtreeEntries = lists:zipwith(
+ fun(#doc{id=Id,deleted=Delete,revs=Revs,body=Body}, OldDocLookup) ->
+ BasedOnRev =
+ case Revs of
+ [] -> 0;
+ [RevStr|_] -> list_to_integer(RevStr) - 1
+ end,
+ OldRev =
+ case OldDocLookup of
+ {ok, {_, {OldRev0, _}}} -> OldRev0;
+ not_found -> 0
+ end,
+ case OldRev == BasedOnRev of
+ true ->
+ case Delete of
+ false -> {update, {Id, {OldRev+1, Body}}};
+ true -> {remove, Id}
+ end;
+ false ->
+ throw(conflict)
+ end
+
+ end, Docs, OldDocLookups),
+
+ BtreeIdsRemove = [Id || {remove, Id} <- BtreeEntries],
+ BtreeIdsUpdate = [ByIdDocInfo || {update, ByIdDocInfo} <- BtreeEntries],
+
+ {ok, Btree2} =
+ couch_btree:add_remove(Btree, BtreeIdsUpdate, BtreeIdsRemove),
+
+ {ok, Db#db{local_docs_btree = Btree2}}.
+
+
+
+commit_outstanding(#db{fd=Fd, header=Header} = Db) ->
+ ok = couch_file:sync(Fd), % commit outstanding data
+ Header2 = Header#db_header{
+ last_update_seq = Db#db.last_update_seq,
+ summary_stream_state = couch_stream:get_state(Db#db.summary_stream),
+ docinfo_by_seq_btree_state = couch_btree:get_state(Db#db.docinfo_by_seq_btree),
+ docinfo_by_Id_btree_state = couch_btree:get_state(Db#db.docinfo_by_Id_btree),
+ local_docs_btree_state = couch_btree:get_state(Db#db.local_docs_btree),
+ doc_count = Db#db.doc_count,
+ doc_del_count = Db#db.doc_del_count
+ },
+ ok = couch_file:write_header(Fd, <<$g, $m, $k, 0>>, Header2),
+ ok = couch_file:sync(Fd), % commit header to disk
+ Db2 = Db#db{
+ header = Header2
+ },
+ {ok, Db2}.
+
+
+code_change(_OldVsn, State, _Extra) ->
+ {ok, State}.
+
+handle_info(_Info, State) ->
+ {noreply, State}.
+
+
diff --git a/src/couchdb/couch_db.hrl b/src/couchdb/couch_db.hrl
new file mode 100644
index 00000000..51ee7af2
--- /dev/null
+++ b/src/couchdb/couch_db.hrl
@@ -0,0 +1,56 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-define(LOCAL_DOC_PREFIX, "_local/").
+-define(DESIGN_DOC_PREFIX0, "_design").
+-define(DESIGN_DOC_PREFIX, "_design/").
+
+-define(DEFAULT_ATTACHMENT_CONTENT_TYPE, "application/octet-stream").
+
+-record(doc_info,
+ {
+ id = "",
+ rev = "",
+ update_seq = 0,
+ summary_pointer = nil,
+ conflict_revs = [],
+ deleted_conflict_revs = [],
+ deleted = false
+ }).
+
+-record(full_doc_info,
+ {id = "",
+ update_seq = 0,
+ rev_tree = []
+ }).
+
+-record(doc,
+ {
+ id = "",
+ revs = [], % in the form [{RevId, IsAvailable}, ...]
+
+ % the json body object.
+ body = {obj, []},
+
+ % each attachment contains:
+ % {data, Type, <<binary>>}
+ % or:
+ % {pointer, Type, {FileHandle, StreamPointer, Length}}
+ attachments = [],
+
+ deleted = false,
+
+ % key/value tuple of meta information, provided when using special options:
+ % couch_db:open_doc(Db, Id, Options).
+ meta = []
+ }).
+
diff --git a/src/couchdb/couch_db_update_notifier.erl b/src/couchdb/couch_db_update_notifier.erl
new file mode 100644
index 00000000..96354620
--- /dev/null
+++ b/src/couchdb/couch_db_update_notifier.erl
@@ -0,0 +1,66 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+%
+% This causes an OS process to spawned and it is notified every time a database
+% is updated.
+%
+% The notifications are in the form of a the database name sent as a line of
+% text to the OS processes stdout.
+%
+
+-module(couch_db_update_notifier).
+
+-behaviour(gen_event).
+
+-export([start_link/1, notify/1]).
+-export([init/1, terminate/2, handle_event/2, handle_call/2, handle_info/2, code_change/3,stop/1]).
+
+-define(ERR_HANDLE, {Port, {exit_status, Status}} -> {stop, {unknown_error, Status}, {unknown_error, Status}, Port}).
+
+start_link(Exec) ->
+ couch_event_sup:start_link(couch_db_update, {couch_db_update_notifier, make_ref()}, Exec).
+
+notify(Event) ->
+ gen_event:notify(couch_db_update, Event).
+
+stop(Pid) ->
+ couch_event_sup:stop(Pid).
+
+init(Exec) when is_list(Exec) -> % an exe
+ Port = open_port({spawn, Exec}, [stream, exit_status, hide]),
+ {ok, Port};
+init(Else) ->
+ {ok, Else}.
+
+terminate(_Reason, _Port) ->
+ ok.
+
+handle_event(Event, Fun) when is_function(Fun, 1) ->
+ Fun(Event),
+ {ok, Fun};
+handle_event(Event, {Fun, FunAcc}) ->
+ FunAcc2 = Fun(Event, FunAcc),
+ {ok, {Fun, FunAcc2}};
+handle_event({EventAtom, DbName}, Port) ->
+ Obj = {obj, [{type, atom_to_list(EventAtom)}, {db, DbName}]},
+ true = port_command(Port, cjson:encode(Obj) ++ "\n"),
+ {ok, Port}.
+
+handle_call(_Request, State) ->
+ {ok, ok, State}.
+
+handle_info({'EXIT', _, _Reason}, _Port) ->
+ remove_handler.
+
+code_change(_OldVsn, State, _Extra) ->
+ {ok, State}.
diff --git a/src/couchdb/couch_doc.erl b/src/couchdb/couch_doc.erl
new file mode 100644
index 00000000..a9ef55f7
--- /dev/null
+++ b/src/couchdb/couch_doc.erl
@@ -0,0 +1,199 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_doc).
+
+-export([get_view_functions/1, is_special_doc/1,to_doc_info/1]).
+-export([bin_foldl/3,bin_size/1,bin_to_binary/1]).
+-export([from_json_obj/1,to_json_obj/2,has_stubs/1, merge_stubs/2]).
+
+-include("couch_db.hrl").
+
+to_json_obj(#doc{id=Id,deleted=Del,body=Body,revs=Revs,meta=Meta}=Doc,Options)->
+ {obj, [{"_id", Id}] ++
+ case Revs of
+ [] -> [];
+ _ -> [{"_rev", lists:nth(1, Revs)}]
+ end ++
+ case Del of
+ false ->
+ {obj, BodyProps} = Body,
+ BodyProps;
+ true ->
+ [{"_deleted", true}]
+ end ++
+ case lists:member(revs, Options) of
+ false -> [];
+ true ->
+ [{"_revs", list_to_tuple(Revs)}]
+ end ++
+ lists:map(
+ fun({revs_info, RevsInfo}) ->
+ JsonRevsInfo =
+ [{obj, [{rev, Rev}, {status, atom_to_list(Status)}]} ||
+ {Rev, Status} <- RevsInfo],
+ {"_revs_info", list_to_tuple(JsonRevsInfo)};
+ ({conflicts, Conflicts}) ->
+ {"_conflicts", list_to_tuple(Conflicts)};
+ ({deleted_conflicts, Conflicts}) ->
+ {"_deleted_conflicts", list_to_tuple(Conflicts)}
+ end, Meta) ++
+ case lists:member(attachments, Options) of
+ true -> % return the full rev list and the binaries as strings.
+ BinProps = lists:map(
+ fun({Name, {Type, BinValue}}) ->
+ {Name, {obj, [{"content-type", Type},
+ {"data", couch_util:encodeBase64(bin_to_binary(BinValue))}]}}
+ end,
+ Doc#doc.attachments),
+ case BinProps of
+ [] -> [];
+ _ -> [{"_attachments", {obj, BinProps}}]
+ end;
+ false ->
+ BinProps = lists:map(
+ fun({Name, {Type, BinValue}}) ->
+ {Name, {obj, [{"stub", true}, {"content-type", Type},
+ {"length", bin_size(BinValue)}]}}
+ end,
+ Doc#doc.attachments),
+ case BinProps of
+ [] -> [];
+ _ -> [{"_attachments", {obj, BinProps}}]
+ end
+ end
+ }.
+
+from_json_obj({obj, Props}) ->
+ {obj,JsonBins} = proplists:get_value("_attachments", Props, {obj, []}),
+ Bins = lists:flatmap(fun({Name, {obj, BinProps}}) ->
+ case proplists:get_value("stub", BinProps) of
+ true ->
+ [{Name, stub}];
+ _ ->
+ Value = proplists:get_value("data", BinProps),
+ Type = proplists:get_value("content-type", BinProps,
+ ?DEFAULT_ATTACHMENT_CONTENT_TYPE),
+ [{Name, {Type, couch_util:decodeBase64(Value)}}]
+ end
+ end, JsonBins),
+ AllowedSpecialMembers = ["id", "revs", "rev", "attachments", "revs_info",
+ "conflicts", "deleted_conflicts", "deleted"],
+ [case lists:member(Name, AllowedSpecialMembers) of
+ true ->
+ ok;
+ false ->
+ throw({doc_validation, io_lib:format("Bad special document member: _~s", [Name])})
+ end
+ || {[$_|Name], _Value} <- Props],
+ Revs =
+ case tuple_to_list(proplists:get_value("_revs", Props, {})) of
+ [] ->
+ case proplists:get_value("_rev", Props) of
+ undefined -> [];
+ Rev -> [Rev]
+ end;
+ Revs0 ->
+ Revs0
+ end,
+ #doc{
+ id = proplists:get_value("_id", Props, ""),
+ revs = Revs,
+ deleted = proplists:get_value("_deleted", Props, false),
+ body = {obj, [{Key, Value} || {[FirstChar|_]=Key, Value} <- Props, FirstChar /= $_]},
+ attachments = Bins
+ }.
+
+
+to_doc_info(#full_doc_info{id=Id,update_seq=Seq,rev_tree=Tree}) ->
+ LeafRevs = couch_key_tree:get_all_leafs(Tree),
+ SortedLeafRevs =
+ lists:sort(fun({RevIdA, {IsDeletedA, _}, PathA}, {RevIdB, {IsDeletedB, _}, PathB}) ->
+ % sort descending by {not deleted, then Depth, then RevisionId}
+ A = {not IsDeletedA, length(PathA), RevIdA},
+ B = {not IsDeletedB, length(PathB), RevIdB},
+ A > B
+ end,
+ LeafRevs),
+
+ [{RevId, {IsDeleted, SummaryPointer}, _Path} | Rest] = SortedLeafRevs,
+
+ {ConflictRevTuples, DeletedConflictRevTuples} =
+ lists:splitwith(fun({_ConflictRevId, {IsDeleted1, _SummaryPointer}, _}) ->
+ not IsDeleted1
+ end, Rest),
+
+ ConflictRevs = [RevId1 || {RevId1, _, _} <- ConflictRevTuples],
+ DeletedConflictRevs = [RevId2 || {RevId2, _, _} <- DeletedConflictRevTuples],
+
+ #doc_info{
+ id=Id,
+ update_seq=Seq,
+ rev = RevId,
+ summary_pointer = SummaryPointer,
+ conflict_revs = ConflictRevs,
+ deleted_conflict_revs = DeletedConflictRevs,
+ deleted = IsDeleted
+ }.
+
+is_special_doc(?DESIGN_DOC_PREFIX ++ _ ) ->
+ true;
+is_special_doc(#doc{id=Id}) ->
+ is_special_doc(Id);
+is_special_doc(_) ->
+ false.
+
+bin_foldl(Bin, Fun, Acc) when is_binary(Bin) ->
+ case Fun(Bin, Acc) of
+ {ok, Acc2} -> {ok, Acc2};
+ {done, Acc2} -> {ok, Acc2}
+ end;
+bin_foldl({Fd, Sp, Len}, Fun, Acc) ->
+ {ok, Acc2, _Sp2} = couch_stream:foldl(Fd, Sp, Len, Fun, Acc),
+ {ok, Acc2}.
+
+bin_size(Bin) when is_binary(Bin) ->
+ size(Bin);
+bin_size({_Fd, _Sp, Len}) ->
+ Len.
+
+bin_to_binary(Bin) when is_binary(Bin) ->
+ Bin;
+bin_to_binary({Fd, Sp, Len}) ->
+ {ok, Bin, _Sp2} = couch_stream:read(Fd, Sp, Len),
+ Bin.
+
+get_view_functions(#doc{body={obj, Fields}}) ->
+ Lang = proplists:get_value("language", Fields, "text/javascript"),
+ {obj, Views} = proplists:get_value("views", Fields, {obj, []}),
+ {Lang, [{ViewName, Value} || {ViewName, Value} <- Views, is_list(Value)]};
+get_view_functions(_Doc) ->
+ none.
+
+has_stubs(#doc{attachments=Bins}) ->
+ has_stubs(Bins);
+has_stubs([]) ->
+ false;
+has_stubs([{_Name, stub}|_]) ->
+ true;
+has_stubs([_Bin|Rest]) ->
+ has_stubs(Rest).
+
+merge_stubs(#doc{attachments=MemBins}=StubsDoc, #doc{attachments=DiskBins}) ->
+ BinDict = dict:from_list(DiskBins),
+ MergedBins = lists:map(
+ fun({Name, stub}) ->
+ {Name, dict:fetch(Name, BinDict)};
+ ({Name, Value}) ->
+ {Name, Value}
+ end, MemBins),
+ StubsDoc#doc{attachments= MergedBins}.
diff --git a/src/couchdb/couch_erl_driver.c b/src/couchdb/couch_erl_driver.c
new file mode 100644
index 00000000..b5703f09
--- /dev/null
+++ b/src/couchdb/couch_erl_driver.c
@@ -0,0 +1,160 @@
+/*
+
+Licensed under the Apache License, Version 2.0 (the "License"); you may not use
+this file except in compliance with the License. You may obtain a copy of the
+License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software distributed
+under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
+CONDITIONS OF ANY KIND, either express or implied. See the License for the
+specific language governing permissions and limitations under the License.
+
+*/
+
+// This file is the C port driver for Erlang. It provides a low overhead
+// means of calling into C code, however unlike the Fabric engine, coding
+// errors in this module can crash the entire Erlang server.
+
+#include "erl_driver.h"
+#include "unicode/ucol.h"
+#include "unicode/ucasemap.h"
+#ifndef WIN32
+#include <string.h> // for memcpy
+#endif
+
+typedef struct {
+ ErlDrvPort port;
+ UCollator* collNoCase;
+ UCollator* coll;
+} couch_drv_data;
+
+static void couch_drv_stop(ErlDrvData data)
+{
+ couch_drv_data* pData = (couch_drv_data*)data;
+ if (pData->coll) {
+ ucol_close(pData->coll);
+ }
+ if (pData->collNoCase) {
+ ucol_close(pData->collNoCase);
+ }
+ driver_free((char*)pData);
+}
+
+static ErlDrvData couch_drv_start(ErlDrvPort port, char *buff)
+{
+ UErrorCode status = U_ZERO_ERROR;
+ couch_drv_data* pData = (couch_drv_data*)driver_alloc(sizeof(couch_drv_data));
+
+ if (pData == NULL)
+ return ERL_DRV_ERROR_GENERAL;
+
+ pData->port = port;
+ pData->coll = NULL;
+ pData->collNoCase = NULL;
+ pData->coll = ucol_open("", &status);
+
+ if (U_FAILURE(status)) {
+ couch_drv_stop((ErlDrvData)pData);
+ return ERL_DRV_ERROR_GENERAL;
+ }
+
+ pData->collNoCase = ucol_open("", &status);
+ if (U_FAILURE(status)) {
+ couch_drv_stop((ErlDrvData)pData);
+ return ERL_DRV_ERROR_GENERAL;
+ }
+
+ ucol_setAttribute(pData->collNoCase, UCOL_STRENGTH, UCOL_PRIMARY, &status);
+ if (U_FAILURE(status)) {
+ couch_drv_stop((ErlDrvData)pData);
+ return ERL_DRV_ERROR_GENERAL;
+ }
+
+ return (ErlDrvData)pData;
+}
+
+static int return_control_result(void* pLocalResult, int localLen, char **ppRetBuf, int returnLen)
+{
+ if (*ppRetBuf == NULL || localLen > returnLen) {
+ *ppRetBuf = (char*)driver_alloc_binary(localLen);
+ if(*ppRetBuf == NULL) {
+ return -1;
+ }
+ }
+ memcpy(*ppRetBuf, pLocalResult, localLen);
+ return localLen;
+}
+
+static int couch_drv_control(ErlDrvData drv_data, unsigned int command, const char *pBuf,
+ int bufLen, char **rbuf, int rlen)
+{
+ #define COLLATE 0
+ #define COLLATE_NO_CASE 1
+
+ couch_drv_data* pData = (couch_drv_data*)drv_data;
+
+ UErrorCode status = U_ZERO_ERROR;
+ int collResult;
+ char response;
+ UCharIterator iterA;
+ UCharIterator iterB;
+ int32_t length;
+
+ // 2 strings are in the buffer, consecutively
+ // The strings begin first with a 32 bit integer byte length, then the actual
+ // string bytes follow.
+
+ // first 32bits are the length
+ memcpy(&length, pBuf, sizeof(length));
+ pBuf += sizeof(length);
+
+ // point the iterator at it.
+ uiter_setUTF8(&iterA, pBuf, length);
+
+ pBuf += length; // now on to string b
+
+ // first 32bits are the length
+ memcpy(&length, pBuf, sizeof(length));
+ pBuf += sizeof(length);
+
+ // point the iterator at it.
+ uiter_setUTF8(&iterB, pBuf, length);
+
+ if (command == COLLATE)
+ collResult = ucol_strcollIter(pData->coll, &iterA, &iterB, &status);
+ else if (command == COLLATE_NO_CASE)
+ collResult = ucol_strcollIter(pData->collNoCase, &iterA, &iterB, &status);
+ else
+ return -1;
+
+ if (collResult < 0)
+ response = 0; //lt
+ else if (collResult > 0)
+ response = 1; //gt
+ else
+ response = 2; //eq
+
+ return return_control_result(&response, sizeof(response), rbuf, rlen);
+}
+
+ErlDrvEntry couch_driver_entry = {
+ NULL, /* F_PTR init, N/A */
+ couch_drv_start, /* L_PTR start, called when port is opened */
+ couch_drv_stop, /* F_PTR stop, called when port is closed */
+ NULL, /* F_PTR output, called when erlang has sent */
+ NULL, /* F_PTR ready_input, called when input descriptor ready */
+ NULL, /* F_PTR ready_output, called when output descriptor ready */
+ "couch_erl_driver", /* char *driver_name, the argument to open_port */
+ NULL, /* F_PTR finish, called when unloaded */
+ NULL, /* Not used */
+ couch_drv_control, /* F_PTR control, port_command callback */
+ NULL, /* F_PTR timeout, reserved */
+ NULL /* F_PTR outputv, reserved */
+};
+
+DRIVER_INIT(couch_erl_driver) /* must match name in driver_entry */
+{
+ return &couch_driver_entry;
+}
diff --git a/src/couchdb/couch_event_sup.erl b/src/couchdb/couch_event_sup.erl
new file mode 100644
index 00000000..72b17e5d
--- /dev/null
+++ b/src/couchdb/couch_event_sup.erl
@@ -0,0 +1,69 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+%% The purpose of this module is to allow event handlers to particpate in Erlang
+%% supervisor trees. It provide a monitorable process that crashes if the event
+%% handler fails. The process, when shutdown, deregisters the event handler.
+
+-module(couch_event_sup).
+-behaviour(gen_server).
+
+-include("couch_db.hrl").
+
+-export([start_link/3,start_link/4, stop/1]).
+-export([init/1, terminate/2, handle_call/3, handle_cast/2, handle_info/2,code_change/3]).
+
+%
+% Instead calling the
+% ok = gen_event:add_sup_handler(error_logger, my_log, Args)
+%
+% do this:
+% {ok, LinkedPid} = couch_event_sup:start_link(error_logger, my_log, Args)
+%
+% The benefit is the event is now part of the process tree, and can be
+% started, restarted and shutdown consistently like the rest of the server
+% components.
+%
+% And now if the "event" crashes, the supervisor is notified and can restart
+% the event handler.
+%
+% Use this form to named process:
+% {ok, LinkedPid} = couch_event_sup:start_link({local, my_log}, error_logger, my_log, Args)
+%
+
+start_link(EventMgr, EventHandler, Args) ->
+ gen_server:start_link(couch_event_sup, {EventMgr, EventHandler, Args}, []).
+
+start_link(ServerName, EventMgr, EventHandler, Args) ->
+ gen_server:start_link(ServerName, couch_event_sup, {EventMgr, EventHandler, Args}, []).
+
+stop(Pid) ->
+ gen_server:cast(Pid, stop).
+
+init({EventMgr, EventHandler, Args}) ->
+ ok = gen_event:add_sup_handler(EventMgr, EventHandler, Args),
+ {ok, {EventMgr, EventHandler}}.
+
+terminate(_Reason, _State) ->
+ ok.
+
+handle_call(_Whatever, _From, State) ->
+ {ok, State}.
+
+handle_cast(stop, State) ->
+ {stop, normal, State}.
+
+handle_info({gen_event_EXIT, _Handler, Reason}, State) ->
+ {stop, Reason, State}.
+
+code_change(_OldVsn, State, _Extra) ->
+ {ok, State}.
diff --git a/src/couchdb/couch_file.erl b/src/couchdb/couch_file.erl
new file mode 100644
index 00000000..6cbad44a
--- /dev/null
+++ b/src/couchdb/couch_file.erl
@@ -0,0 +1,323 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_file).
+-behaviour(gen_server).
+
+-define(HEADER_SIZE, 2048). % size of each segment of the doubly written header
+
+-export([open/1, open/2, close/1, pread/3, pwrite/3, expand/2, bytes/1, sync/1]).
+-export([append_term/2, pread_term/2,write_header/3, read_header/2, truncate/2]).
+-export([init/1, terminate/2, handle_call/3, handle_cast/2, code_change/3, handle_info/2]).
+
+%%----------------------------------------------------------------------
+%% Args: Valid Options are [create] and [create,overwrite].
+%% Files are opened in read/write mode.
+%% Returns: On success, {ok, Fd}
+%% or {error, Reason} if the file could not be opened.
+%%----------------------------------------------------------------------
+
+open(Filepath) ->
+ open(Filepath, []).
+
+open(Filepath, Options) ->
+ case gen_server:start_link(couch_file, {Filepath, Options, self()}, []) of
+ {ok, FdPid} ->
+ % we got back an ok, but that doesn't really mean it was successful.
+ % Instead the true status has been sent back to us as a message.
+ % We do this because if the gen_server doesn't initialize properly,
+ % it generates a crash report that will get logged. This avoids
+ % that mess, because we don't want crash reports generated
+ % every time a file cannot be found.
+ receive
+ {FdPid, ok} ->
+ {ok, FdPid};
+ {FdPid, Error} ->
+ Error
+ end;
+ Error ->
+ Error
+ end.
+
+
+%%----------------------------------------------------------------------
+%% Args: Pos is the offset from the beginning of the file, Bytes is
+%% is the number of bytes to read.
+%% Returns: {ok, Binary} where Binary is a binary data from disk
+%% or {error, Reason}.
+%%----------------------------------------------------------------------
+
+pread(Fd, Pos, Bytes) when Bytes > 0 ->
+ gen_server:call(Fd, {pread, Pos, Bytes}).
+
+
+%%----------------------------------------------------------------------
+%% Args: Pos is the offset from the beginning of the file, Bin is
+%% is the binary to write
+%% Returns: ok
+%% or {error, Reason}.
+%%----------------------------------------------------------------------
+
+pwrite(Fd, Pos, Bin) ->
+ gen_server:call(Fd, {pwrite, Pos, Bin}).
+
+%%----------------------------------------------------------------------
+%% Purpose: To append a segment of zeros to the end of the file.
+%% Args: Bytes is the number of bytes to append to the file.
+%% Returns: {ok, Pos} where Pos is the file offset to the beginning of
+%% the new segments.
+%% or {error, Reason}.
+%%----------------------------------------------------------------------
+
+expand(Fd, Bytes) when Bytes > 0 ->
+ gen_server:call(Fd, {expand, Bytes}).
+
+
+%%----------------------------------------------------------------------
+%% Purpose: To append an Erlang term to the end of the file.
+%% Args: Erlang term to serialize and append to the file.
+%% Returns: {ok, Pos} where Pos is the file offset to the beginning the
+%% serialized term. Use pread_term to read the term back.
+%% or {error, Reason}.
+%%----------------------------------------------------------------------
+
+append_term(Fd, Term) ->
+ gen_server:call(Fd, {append_term, Term}).
+
+
+%%----------------------------------------------------------------------
+%% Purpose: Reads a term from a file that was written with append_term
+%% Args: Pos, the offset into the file where the term is serialized.
+%% Returns: {ok, Term}
+%% or {error, Reason}.
+%%----------------------------------------------------------------------
+
+pread_term(Fd, Pos) ->
+ gen_server:call(Fd, {pread_term, Pos}).
+
+
+%%----------------------------------------------------------------------
+%% Purpose: The length of a file, in bytes.
+%% Returns: {ok, Bytes}
+%% or {error, Reason}.
+%%----------------------------------------------------------------------
+
+% length in bytes
+bytes(Fd) ->
+ gen_server:call(Fd, bytes).
+
+%%----------------------------------------------------------------------
+%% Purpose: Truncate a file to the number of bytes.
+%% Returns: ok
+%% or {error, Reason}.
+%%----------------------------------------------------------------------
+
+truncate(Fd, Pos) ->
+ gen_server:call(Fd, {truncate, Pos}).
+
+%%----------------------------------------------------------------------
+%% Purpose: Ensure all bytes written to the file are flushed to disk.
+%% Returns: ok
+%% or {error, Reason}.
+%%----------------------------------------------------------------------
+
+sync(Fd) ->
+ gen_server:call(Fd, sync).
+
+%%----------------------------------------------------------------------
+%% Purpose: Close the file. Is performed asynchronously.
+%% Returns: ok
+%%----------------------------------------------------------------------
+close(Fd) ->
+ gen_server:cast(Fd, close).
+
+
+write_header(Fd, Prefix, Data) ->
+ % The leading bytes in every db file, the sig and the file version:
+ %the actual header data
+ TermBin = term_to_binary(Data),
+ % the size of all the bytes written to the header, including the md5 signature (16 bytes)
+ FilledSize = size(Prefix) + size(TermBin) + 16,
+ case FilledSize > ?HEADER_SIZE of
+ true ->
+ % too big!
+ {error, error_header_too_large};
+ false ->
+ % pad out the header with zeros, then take the md5 hash
+ PadZeros = <<0:(8*(?HEADER_SIZE - FilledSize))>>,
+ Sig = erlang:md5([TermBin, PadZeros]),
+ % now we assemble the final header binary and write to disk
+ WriteBin = <<Prefix/binary, TermBin/binary, PadZeros/binary, Sig/binary>>,
+ ?HEADER_SIZE = size(WriteBin), % sanity check
+ DblWriteBin = [WriteBin, WriteBin],
+ ok = pwrite(Fd, 0, DblWriteBin)
+ end.
+
+
+read_header(Fd, Prefix) ->
+ {ok, Bin} = couch_file:pread(Fd, 0, 2*(?HEADER_SIZE)),
+ <<Bin1:(?HEADER_SIZE)/binary, Bin2:(?HEADER_SIZE)/binary>> = Bin,
+ % read the first header
+ case extract_header(Prefix, Bin1) of
+ {ok, Header1} ->
+ case extract_header(Prefix, Bin2) of
+ {ok, Header2} ->
+ case Header1 == Header2 of
+ true ->
+ % Everything is completely normal!
+ {ok, Header1};
+ false ->
+ % To get here we must have two different header versions with signatures intact.
+ % It's weird but possible (a commit failure right at the 2k boundary). Log it and take the first.
+ couch_log:info("Header version differences.~nPrimary Header: ~p~nSecondary Header: ~p", [Header1, Header2]),
+ {ok, Header1}
+ end;
+ {error, Error} ->
+ % error reading second header. It's ok, but log it.
+ couch_log:info("Secondary header corruption (error: ~p). Using primary header.", [Error]),
+ {ok, Header1}
+ end;
+ {error, Error} ->
+ % error reading primary header
+ case extract_header(Prefix, Bin2) of
+ {ok, Header2} ->
+ % log corrupt primary header. It's ok since the secondary is still good.
+ couch_log:info("Primary header corruption (error: ~p). Using secondary header.", [Error]),
+ {ok, Header2};
+ _ ->
+ % error reading secondary header too
+ % return the error, no need to log anything as the caller will be responsible for dealing with the error.
+ {error, Error}
+ end
+ end.
+
+
+extract_header(Prefix, Bin) ->
+ SizeOfPrefix = size(Prefix),
+ SizeOfTermBin = ?HEADER_SIZE -
+ SizeOfPrefix -
+ 16, % md5 sig
+
+ <<HeaderPrefix:SizeOfPrefix/binary, TermBin:SizeOfTermBin/binary, Sig:16/binary>> = Bin,
+
+ % check the header prefix
+ case HeaderPrefix of
+ Prefix ->
+ % check the integrity signature
+ case erlang:md5(TermBin) == Sig of
+ true ->
+ Header = binary_to_term(TermBin),
+ {ok, Header};
+ false ->
+ {error, header_corrupt}
+ end;
+ _ ->
+ {error, unknown_header_type}
+ end.
+
+
+
+init_status_ok(ReturnPid, Fd) ->
+ ReturnPid ! {self(), ok}, % signal back ok
+ {ok, Fd}.
+
+init_status_error(ReturnPid, Error) ->
+ ReturnPid ! {self(), Error}, % signal back error status
+ self() ! self_close, % tell ourself to close async
+ {ok, nil}.
+
+% server functions
+
+init({Filepath, Options, ReturnPid}) ->
+ case lists:member(create, Options) of
+ true ->
+ filelib:ensure_dir(Filepath),
+ case file:open(Filepath, [read, write, raw, binary]) of
+ {ok, Fd} ->
+ {ok, Length} = file:position(Fd, eof),
+ case Length > 0 of
+ true ->
+ % this means the file already exists and has data.
+ % FYI: We don't differentiate between empty files and non-existant
+ % files here.
+ case lists:member(overwrite, Options) of
+ true ->
+ {ok, 0} = file:position(Fd, 0),
+ ok = file:truncate(Fd),
+ init_status_ok(ReturnPid, Fd);
+ false ->
+ ok = file:close(Fd),
+ init_status_error(ReturnPid, {error, file_exists})
+ end;
+ false ->
+ init_status_ok(ReturnPid, Fd)
+ end;
+ Error ->
+ init_status_error(ReturnPid, Error)
+ end;
+ false ->
+ % open in read mode first, so we don't create the file if it doesn't exist.
+ case file:open(Filepath, [read, raw]) of
+ {ok, Fd_Read} ->
+ {ok, Fd} = file:open(Filepath, [read, write, raw, binary]),
+ ok = file:close(Fd_Read),
+ init_status_ok(ReturnPid, Fd);
+ Error ->
+ init_status_error(ReturnPid, Error)
+ end
+ end.
+
+
+terminate(_Reason, nil) ->
+ ok;
+terminate(_Reason, Fd) ->
+ file:close(Fd),
+ ok.
+
+
+handle_call({pread, Pos, Bytes}, _From, Fd) ->
+ {reply, file:pread(Fd, Pos, Bytes), Fd};
+handle_call({pwrite, Pos, Bin}, _From, Fd) ->
+ {reply, file:pwrite(Fd, Pos, Bin), Fd};
+handle_call({expand, Num}, _From, Fd) ->
+ {ok, Pos} = file:position(Fd, eof),
+ {reply, {file:pwrite(Fd, Pos + Num - 1, <<0>>), Pos}, Fd};
+handle_call(bytes, _From, Fd) ->
+ {reply, file:position(Fd, eof), Fd};
+handle_call(sync, _From, Fd) ->
+ {reply, file:sync(Fd), Fd};
+handle_call({truncate, Pos}, _From, Fd) ->
+ {ok, Pos} = file:position(Fd, Pos),
+ {reply, file:truncate(Fd), Fd};
+handle_call({append_term, Term}, _From, Fd) ->
+ Bin = term_to_binary(Term, [compressed]),
+ TermLen = size(Bin),
+ Bin2 = <<TermLen:32, Bin/binary>>,
+ {ok, Pos} = file:position(Fd, eof),
+ {reply, {file:pwrite(Fd, Pos, Bin2), Pos}, Fd};
+handle_call({pread_term, Pos}, _From, Fd) ->
+ {ok, <<TermLen:32>>}
+ = file:pread(Fd, Pos, 4),
+ {ok, Bin} = file:pread(Fd, Pos + 4, TermLen),
+ {reply, {ok, binary_to_term(Bin)}, Fd}.
+
+
+handle_cast(close, Fd) ->
+ {stop,normal,Fd}. % causes terminate to be called
+
+code_change(_OldVsn, State, _Extra) ->
+ {ok, State}.
+
+handle_info(self_close, State) ->
+ {stop,normal,State};
+handle_info(_Info, State) ->
+ {noreply, State}.
diff --git a/src/couchdb/couch_ft_query.erl b/src/couchdb/couch_ft_query.erl
new file mode 100644
index 00000000..2d1b9fc5
--- /dev/null
+++ b/src/couchdb/couch_ft_query.erl
@@ -0,0 +1,78 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_ft_query).
+-behaviour(gen_server).
+
+-export([start_link/1, execute/2]).
+
+-export([init/1, terminate/2, handle_call/3, handle_cast/2, handle_info/2,code_change/3, stop/0]).
+
+-define(ERR_HANDLE, {Port, {exit_status, Status}} -> {stop, {unknown_error, Status}, {unknown_error, Status}, Port}).
+
+start_link(QueryExec) ->
+ gen_server:start_link({local, couch_ft_query}, couch_ft_query, QueryExec, []).
+
+stop() ->
+ exit(whereis(couch_ft_query), close).
+
+execute(DatabaseName, QueryString) ->
+ gen_server:call(couch_ft_query, {ft_query, DatabaseName, QueryString}).
+
+init(QueryExec) ->
+ Port = open_port({spawn, QueryExec}, [{line, 1000}, exit_status, hide]),
+ {ok, Port}.
+
+terminate(_Reason, _Server) ->
+ ok.
+
+handle_call({ft_query, Database, QueryText}, _From, Port) ->
+ %% send the database name
+ true = port_command(Port, Database ++ "\n"),
+ true = port_command(Port, QueryText ++ "\n"),
+ case get_line(Port) of
+ "ok" ->
+ DocIds = read_query_results(Port, []),
+ {reply, {ok, DocIds}, Port};
+ "error" ->
+ ErrorId = get_line(Port),
+ ErrorMsg = get_line(Port),
+ {reply, {list_to_atom(ErrorId), ErrorMsg}, Port}
+ end.
+
+read_query_results(Port, Acc) ->
+ case get_line(Port) of
+ "" -> % line by itself means all done
+ lists:reverse(Acc);
+ DocId ->
+ Score = get_line(Port),
+ read_query_results(Port, [{DocId, Score} | Acc])
+ end.
+
+
+get_line(Port) ->
+ receive
+ {Port, {data, {eol, Line}}} ->
+ Line;
+ ?ERR_HANDLE
+ end.
+
+handle_cast(_Whatever, State) ->
+ {noreply, State}.
+
+handle_info({Port, {exit_status, Status}}, Port) ->
+ {stop, {os_process_exited, Status}, Port};
+handle_info(_Whatever, State) ->
+ {noreply, State}.
+
+code_change(_OldVsn, State, _Extra) ->
+ {ok, State}.
diff --git a/src/couchdb/couch_js.c b/src/couchdb/couch_js.c
new file mode 100644
index 00000000..a234fda9
--- /dev/null
+++ b/src/couchdb/couch_js.c
@@ -0,0 +1,452 @@
+/*
+
+Licensed under the Apache License, Version 2.0 (the "License"); you may not use
+this file except in compliance with the License. You may obtain a copy of the
+License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software distributed
+under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
+CONDITIONS OF ANY KIND, either express or implied. See the License for the
+specific language governing permissions and limitations under the License.
+
+*/
+
+#include <stdio.h>
+#include <jsapi.h>
+
+int gExitCode = 0;
+int gStackChunkSize = 8L * 1024L;
+
+int
+EncodeChar(uint8 *utf8Buffer, uint32 ucs4Char) {
+ int utf8Length = 1;
+
+ if (ucs4Char < 0x80) {
+ *utf8Buffer = (uint8)ucs4Char;
+ } else {
+ int i;
+ uint32 a = ucs4Char >> 11;
+ utf8Length = 2;
+ while (a) {
+ a >>= 5;
+ utf8Length++;
+ }
+ i = utf8Length;
+ while (--i) {
+ utf8Buffer[i] = (uint8)((ucs4Char & 0x3F) | 0x80);
+ ucs4Char >>= 6;
+ }
+ *utf8Buffer = (uint8)(0x100 - (1 << (8-utf8Length)) + ucs4Char);
+ }
+ return utf8Length;
+}
+
+JSBool
+EncodeString(const jschar *src, size_t srclen, char *dst, size_t *dstlenp) {
+ size_t i, utf8Len, dstlen = *dstlenp, origDstlen = dstlen;
+ jschar c, c2;
+ uint32 v;
+ uint8 utf8buf[6];
+
+ if (!dst)
+ dstlen = origDstlen = (size_t) -1;
+
+ while (srclen) {
+ c = *src++;
+ srclen--;
+ if ((c >= 0xDC00) && (c <= 0xDFFF))
+ goto badSurrogate;
+ if (c < 0xD800 || c > 0xDBFF) {
+ v = c;
+ } else {
+ if (srclen < 1)
+ goto bufferTooSmall;
+ c2 = *src++;
+ srclen--;
+ if ((c2 < 0xDC00) || (c2 > 0xDFFF)) {
+ c = c2;
+ goto badSurrogate;
+ }
+ v = ((c - 0xD800) << 10) + (c2 - 0xDC00) + 0x10000;
+ }
+ if (v < 0x0080) {
+ /* no encoding necessary - performance hack */
+ if (!dstlen)
+ goto bufferTooSmall;
+ if (dst)
+ *dst++ = (char) v;
+ utf8Len = 1;
+ } else {
+ utf8Len = EncodeChar(utf8buf, v);
+ if (utf8Len > dstlen)
+ goto bufferTooSmall;
+ if (dst) {
+ for (i = 0; i < utf8Len; i++)
+ *dst++ = (char) utf8buf[i];
+ }
+ }
+ dstlen -= utf8Len;
+ }
+ *dstlenp = (origDstlen - dstlen);
+ return JS_TRUE;
+
+badSurrogate:
+ *dstlenp = (origDstlen - dstlen);
+ return JS_FALSE;
+
+bufferTooSmall:
+ *dstlenp = (origDstlen - dstlen);
+ return JS_FALSE;
+}
+
+static uint32
+DecodeChar(const uint8 *utf8Buffer, int utf8Length) {
+ uint32 ucs4Char;
+ uint32 minucs4Char;
+ /* from Unicode 3.1, non-shortest form is illegal */
+ static const uint32 minucs4Table[] = {
+ 0x00000080, 0x00000800, 0x0001000, 0x0020000, 0x0400000
+ };
+
+ if (utf8Length == 1) {
+ ucs4Char = *utf8Buffer;
+ } else {
+ ucs4Char = *utf8Buffer++ & ((1<<(7-utf8Length))-1);
+ minucs4Char = minucs4Table[utf8Length-2];
+ while (--utf8Length) {
+ ucs4Char = ucs4Char<<6 | (*utf8Buffer++ & 0x3F);
+ }
+ if (ucs4Char < minucs4Char ||
+ ucs4Char == 0xFFFE || ucs4Char == 0xFFFF) {
+ ucs4Char = 0xFFFD;
+ }
+ }
+ return ucs4Char;
+}
+
+JSBool
+DecodeString(const char *src, size_t srclen, jschar *dst, size_t *dstlenp) {
+ uint32 v;
+ size_t offset = 0, j, n, dstlen = *dstlenp, origDstlen = dstlen;
+
+ if (!dst)
+ dstlen = origDstlen = (size_t) -1;
+
+ while (srclen) {
+ v = (uint8) *src;
+ n = 1;
+ if (v & 0x80) {
+ while (v & (0x80 >> n))
+ n++;
+ if (n > srclen)
+ goto bufferTooSmall;
+ if (n == 1 || n > 6)
+ goto badCharacter;
+ for (j = 1; j < n; j++) {
+ if ((src[j] & 0xC0) != 0x80)
+ goto badCharacter;
+ }
+ v = DecodeChar((const uint8 *) src, n);
+ if (v >= 0x10000) {
+ v -= 0x10000;
+ if (v > 0xFFFFF || dstlen < 2) {
+ *dstlenp = (origDstlen - dstlen);
+ return JS_FALSE;
+ }
+ if (dstlen < 2)
+ goto bufferTooSmall;
+ if (dst) {
+ *dst++ = (jschar)((v >> 10) + 0xD800);
+ v = (jschar)((v & 0x3FF) + 0xDC00);
+ }
+ dstlen--;
+ }
+ }
+ if (!dstlen)
+ goto bufferTooSmall;
+ if (dst)
+ *dst++ = (jschar) v;
+ dstlen--;
+ offset += n;
+ src += n;
+ srclen -= n;
+ }
+ *dstlenp = (origDstlen - dstlen);
+ return JS_TRUE;
+
+badCharacter:
+ *dstlenp = (origDstlen - dstlen);
+ return JS_FALSE;
+
+bufferTooSmall:
+ *dstlenp = (origDstlen - dstlen);
+ return JS_FALSE;
+}
+
+static JSBool
+EvalInContext(JSContext *context, JSObject *obj, uintN argc, jsval *argv,
+ jsval *rval) {
+ JSString *str;
+ JSObject *sandbox;
+ JSContext *sub_context;
+ const jschar *src;
+ size_t srclen;
+ JSBool ok;
+ jsval v;
+
+ sandbox = NULL;
+ if (!JS_ConvertArguments(context, argc, argv, "S / o", &str, &sandbox))
+ return JS_FALSE;
+
+ sub_context = JS_NewContext(JS_GetRuntime(context), gStackChunkSize);
+ if (!sub_context) {
+ JS_ReportOutOfMemory(context);
+ return JS_FALSE;
+ }
+
+ src = JS_GetStringChars(str);
+ srclen = JS_GetStringLength(str);
+
+ if (!sandbox) {
+ sandbox = JS_NewObject(sub_context, NULL, NULL, NULL);
+ if (!sandbox || !JS_InitStandardClasses(sub_context, sandbox)) {
+ ok = JS_FALSE;
+ goto out;
+ }
+ }
+
+ if (srclen == 0) {
+ *rval = OBJECT_TO_JSVAL(sandbox);
+ ok = JS_TRUE;
+ } else {
+ ok = JS_EvaluateUCScript(sub_context, sandbox, src, srclen, NULL, -1,
+ rval);
+ }
+
+out:
+ JS_DestroyContext(sub_context);
+ return ok;
+}
+
+static JSBool
+GC(JSContext *context, JSObject *obj, uintN argc, jsval *argv, jsval *rval) {
+ JS_GC(context);
+ return JS_TRUE;
+}
+
+static JSBool
+Print(JSContext *context, JSObject *obj, uintN argc, jsval *argv, jsval *rval) {
+ uintN i, n;
+ size_t cl, bl;
+ JSString *str;
+ jschar *chars;
+ char *bytes;
+
+ for (i = n = 0; i < argc; i++) {
+ str = JS_ValueToString(context, argv[i]);
+ if (!str)
+ return JS_FALSE;
+ chars = JS_GetStringChars(str);
+ cl = JS_GetStringLength(str);
+ if (!EncodeString(chars, cl, NULL, &bl))
+ return JS_FALSE;
+ bytes = JS_malloc(context, bl + 1);
+ bytes[bl] = '\0';
+ if (!EncodeString(chars, cl, bytes, &bl)) {
+ JS_free(context, bytes);
+ return JS_FALSE;
+ }
+ fprintf(stdout, "%s%s", i ? " " : "", bytes);
+ JS_free(context, bytes);
+ }
+ n++;
+ if (n)
+ fputc('\n', stdout);
+ fflush(stdout);
+ return JS_TRUE;
+}
+
+static JSBool
+Quit(JSContext *context, JSObject *obj, uintN argc, jsval *argv, jsval *rval) {
+ JS_ConvertArguments(context, argc, argv, "/ i", &gExitCode);
+ return JS_FALSE;
+}
+
+static JSBool
+ReadLine(JSContext *context, JSObject *obj, uintN argc, jsval *argv, jsval *rval) {
+ char *bytes, *tmp;
+ jschar *chars;
+ size_t bufsize, byteslen, charslen, readlen;
+ JSString *str;
+
+ JS_MaybeGC(context);
+
+ byteslen = 0;
+ bufsize = 256;
+ bytes = JS_malloc(context, bufsize);
+ if (!bytes)
+ return JS_FALSE;
+
+ while ((readlen = js_fgets(bytes + byteslen, bufsize - byteslen, stdin)) > 0) {
+ byteslen += readlen;
+
+ /* Are we done? */
+ if (bytes[byteslen - 1] == '\n') {
+ bytes[byteslen - 1] = '\0';
+ break;
+ }
+
+ /* Else, grow our buffer for another pass */
+ tmp = JS_realloc(context, bytes, bufsize * 2);
+ if (!tmp) {
+ JS_free(context, bytes);
+ return JS_FALSE;
+ }
+
+ bufsize *= 2;
+ bytes = tmp;
+ }
+
+ /* Treat the empty string specially */
+ if (byteslen == 0) {
+ *rval = JS_GetEmptyStringValue(context);
+ JS_free(context, bytes);
+ return JS_TRUE;
+ }
+
+ /* Shrink the buffer to the real size */
+ tmp = JS_realloc(context, bytes, byteslen);
+ if (!tmp) {
+ JS_free(context, bytes);
+ return JS_FALSE;
+ }
+ bytes = tmp;
+
+ /* Decode the string from UTF-8 */
+ if (!DecodeString(bytes, byteslen, NULL, &charslen)) {
+ JS_free(context, bytes);
+ return JS_FALSE;
+ }
+ chars = JS_malloc(context, (charslen + 1) * sizeof(jschar));
+ if (!DecodeString(bytes, byteslen, chars, &charslen)) {
+ JS_free(context, bytes);
+ JS_free(context, chars);
+ return JS_FALSE;
+ }
+ chars[charslen] = '\0';
+
+ /* Initialize a JSString object */
+ str = JS_NewUCString(context, chars, charslen - 1);
+ if (!str) {
+ JS_free(context, bytes);
+ JS_free(context, chars);
+ return JS_FALSE;
+ }
+
+ *rval = STRING_TO_JSVAL(str);
+ return JS_TRUE;
+}
+
+static JSBool
+Seal(JSContext *context, JSObject *obj, uintN argc, jsval *argv, jsval *rval) {
+ JSObject *target;
+ JSBool deep = JS_FALSE;
+
+ if (!JS_ConvertArguments(context, argc, argv, "o/b", &target, &deep))
+ return JS_FALSE;
+ if (!target)
+ return JS_TRUE;
+ return JS_SealObject(context, target, deep);
+}
+
+static void
+ExecuteScript(JSContext *context, JSObject *obj, const char *filename) {
+ FILE *file;
+ JSScript *script;
+ jsval result;
+
+ if (!filename || strcmp(filename, "-") == 0) {
+ file = stdin;
+ } else {
+ file = fopen(filename, "r");
+ if (!file) {
+ fprintf(stderr, "could not open script file %s\n", filename);
+ gExitCode = 1;
+ return;
+ }
+ }
+
+ script = JS_CompileFileHandle(context, obj, filename, file);
+ if (script) {
+ JS_ExecuteScript(context, obj, script, &result);
+ JS_DestroyScript(context, script);
+ }
+}
+
+static uint32 gBranchCount = 0;
+static uint32 gBranchLimit = 100 * 1024;
+
+static JSBool
+BranchCallback(JSContext *context, JSScript *script) {
+ if (++gBranchCount == gBranchLimit) {
+ gBranchCount = 0;
+ return JS_FALSE;
+ }
+ if ((gBranchCount & 0x3fff) == 1) {
+ JS_MaybeGC(context);
+ }
+ return JS_TRUE;
+}
+
+static void
+PrintError(JSContext *context, const char *message, JSErrorReport *report) {
+ if (!report || !JSREPORT_IS_WARNING(report->flags))
+ fprintf(stderr, "%s\n", message);
+}
+
+int
+main(int argc, const char * argv[]) {
+ JSRuntime *runtime;
+ JSContext *context;
+ JSObject *global;
+
+ runtime = JS_NewRuntime(64L * 1024L * 1024L);
+ if (!runtime)
+ return 1;
+ context = JS_NewContext(runtime, gStackChunkSize);
+ if (!context)
+ return 1;
+ JS_SetErrorReporter(context, PrintError);
+ JS_SetBranchCallback(context, BranchCallback);
+ JS_ToggleOptions(context, JSOPTION_NATIVE_BRANCH_CALLBACK);
+ JS_ToggleOptions(context, JSOPTION_XML);
+
+ global = JS_NewObject(context, NULL, NULL, NULL);
+ if (!global)
+ return 1;
+ if (!JS_InitStandardClasses(context, global))
+ return 1;
+ if (!JS_DefineFunction(context, global, "evalcx", EvalInContext, 0, 0)
+ || !JS_DefineFunction(context, global, "gc", GC, 0, 0)
+ || !JS_DefineFunction(context, global, "print", Print, 0, 0)
+ || !JS_DefineFunction(context, global, "quit", Quit, 0, 0)
+ || !JS_DefineFunction(context, global, "readline", ReadLine, 0, 0)
+ || !JS_DefineFunction(context, global, "seal", Seal, 0, 0))
+ return 1;
+
+ if (argc != 2) {
+ fprintf(stderr, "incorrect number of arguments\n\n");
+ fprintf(stderr, "usage: %s <scriptfile>\n", argv[0]);
+ return 2;
+ }
+
+ ExecuteScript(context, global, argv[1]);
+
+ JS_DestroyContext(context);
+ JS_DestroyRuntime(runtime);
+ JS_ShutDown();
+
+ return gExitCode;
+}
diff --git a/src/couchdb/couch_key_tree.erl b/src/couchdb/couch_key_tree.erl
new file mode 100644
index 00000000..705365bd
--- /dev/null
+++ b/src/couchdb/couch_key_tree.erl
@@ -0,0 +1,139 @@
+% Copyright 2007, 2008 Damien Katz <damien_katz@yahoo.com>
+%
+% Licensed under the Apache License, Version 2.0 (the "License");
+% you may not use this file except in compliance with the License.
+% You may obtain a copy of the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS,
+% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+% See the License for the specific language governing permissions and
+% limitations under the License.
+
+-module(couch_key_tree).
+
+-export([merge/2, find_missing/2, get_key_leafs/2, get_full_key_paths/2, get/2]).
+-export([map/2, get_all_leafs/1, get_leaf_keys/1, count_leafs/1]).
+
+% a key tree looks like this:
+% Tree -> [] or [{Key, Value, Tree} | SiblingTree]
+% ChildTree -> Tree
+% SiblingTree -> [] or [{SiblingKey, Value, Tree} | Tree]
+% And each Key < SiblingKey
+
+
+
+% key tree functions
+
+% When the same key is found in the trees, the value in tree B is discarded.
+merge([], B) ->
+ B;
+merge(A, []) ->
+ A;
+merge([ATree | ANextTree], [BTree | BNextTree]) ->
+ {AKey, AValue, ASubTree} = ATree,
+ {BKey, _BValue, BSubTree} = BTree,
+ if
+ AKey == BKey ->
+ %same key
+ MergedSubTree = merge(ASubTree, BSubTree),
+ MergedNextTree = merge(ANextTree, BNextTree),
+ [{AKey, AValue, MergedSubTree} | MergedNextTree];
+ AKey < BKey ->
+ [ATree | merge(ANextTree, [BTree | BNextTree])];
+ true ->
+ [BTree | merge([ATree | ANextTree], BNextTree)]
+ end.
+
+find_missing(_Tree, []) ->
+ [];
+find_missing([], Keys) ->
+ Keys;
+find_missing([{Key, _, SubTree} | RestTree], Keys) ->
+ SrcKeys2 = Keys -- Key,
+ SrcKeys3 = find_missing(SubTree, SrcKeys2),
+ find_missing(RestTree, SrcKeys3).
+
+
+% get the leafs in the tree matching the keys. The matching key nodes can be
+% leafs or an inner nodes. If an inner node, then the leafs for that node
+% are returned.
+get_key_leafs(Tree, Keys) ->
+ get_key_leafs(Tree, Keys, []).
+
+get_key_leafs(_Tree, [], _KeyPathAcc) ->
+ {[], []};
+get_key_leafs([], KeysToGet, _KeyPathAcc) ->
+ {[], KeysToGet};
+get_key_leafs([{Key, _Value, SubTree}=Tree | RestTree], KeysToGet, KeyPathAcc) ->
+ case KeysToGet -- [Key] of
+ KeysToGet -> % same list, key not found
+ {LeafsFound, KeysToGet2} = get_key_leafs(SubTree, KeysToGet, [Key | KeyPathAcc]),
+ {RestLeafsFound, KeysRemaining} = get_key_leafs(RestTree, KeysToGet2, KeyPathAcc),
+ {LeafsFound ++ RestLeafsFound, KeysRemaining};
+ KeysToGet2 ->
+ LeafsFound = get_all_leafs([Tree], KeyPathAcc),
+ LeafKeysFound = [LeafKeyFound || {LeafKeyFound, _, _} <- LeafsFound],
+ KeysToGet2 = KeysToGet2 -- LeafKeysFound,
+ {RestLeafsFound, KeysRemaining} = get_key_leafs(RestTree, KeysToGet2, KeyPathAcc),
+ {LeafsFound ++ RestLeafsFound, KeysRemaining}
+ end.
+
+get(Tree, KeysToGet) ->
+ {KeyPaths, KeysNotFound} = get_full_key_paths(Tree, KeysToGet),
+ FixedResults = [ {Key, Value, [Key0 || {Key0, _} <- Path]} || [{Key, Value}|_] = Path <- KeyPaths],
+ {FixedResults, KeysNotFound}.
+
+get_full_key_paths(Tree, Keys) ->
+ get_full_key_paths(Tree, Keys, []).
+
+get_full_key_paths(_Tree, [], _KeyPathAcc) ->
+ {[], []};
+get_full_key_paths([], KeysToGet, _KeyPathAcc) ->
+ {[], KeysToGet};
+get_full_key_paths([{KeyId, Value, SubTree} | RestTree], KeysToGet, KeyPathAcc) ->
+ KeysToGet2 = KeysToGet -- [KeyId],
+ CurrentNodeResult =
+ case length(KeysToGet2) == length(KeysToGet) of
+ true -> % not in the key list.
+ [];
+ false -> % this node is the key list. return it
+ [[{KeyId, Value} | KeyPathAcc]]
+ end,
+ {KeysGotten, KeysRemaining} = get_full_key_paths(SubTree, KeysToGet2, [{KeyId, Value} | KeyPathAcc]),
+ {KeysGotten2, KeysRemaining2} = get_full_key_paths(RestTree, KeysRemaining, KeyPathAcc),
+ {CurrentNodeResult ++ KeysGotten ++ KeysGotten2, KeysRemaining2}.
+
+get_all_leafs(Tree) ->
+ get_all_leafs(Tree, []).
+
+get_all_leafs([], _KeyPathAcc) ->
+ [];
+get_all_leafs([{KeyId, Value, []} | RestTree], KeyPathAcc) ->
+ [{KeyId, Value, [KeyId | KeyPathAcc]} | get_all_leafs(RestTree, KeyPathAcc)];
+get_all_leafs([{KeyId, _Value, SubTree} | RestTree], KeyPathAcc) ->
+ get_all_leafs(SubTree, [KeyId | KeyPathAcc]) ++ get_all_leafs(RestTree, KeyPathAcc).
+
+get_leaf_keys([]) ->
+ [];
+get_leaf_keys([{Key, _Value, []} | RestTree]) ->
+ [Key | get_leaf_keys(RestTree)];
+get_leaf_keys([{_Key, _Value, SubTree} | RestTree]) ->
+ get_leaf_keys(SubTree) ++ get_leaf_keys(RestTree).
+
+count_leafs([]) ->
+ 0;
+count_leafs([{_Key, _Value, []} | RestTree]) ->
+ 1 + count_leafs(RestTree);
+count_leafs([{_Key, _Value, SubTree} | RestTree]) ->
+ count_leafs(SubTree) + count_leafs(RestTree).
+
+
+map(_Fun, []) ->
+ [];
+map(Fun, [{Key, Value, SubTree} | RestTree]) ->
+ Value2 = Fun(Key, Value),
+ [{Key, Value2, map(Fun, SubTree)} | map(Fun, RestTree)].
+
diff --git a/src/couchdb/couch_log.erl b/src/couchdb/couch_log.erl
new file mode 100644
index 00000000..47e0114d
--- /dev/null
+++ b/src/couchdb/couch_log.erl
@@ -0,0 +1,130 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_log).
+-behaviour(gen_event).
+
+-export([start_link/2,stop/0]).
+-export([error/1,error/2,info/1,info/2,debug/1,debug/2,get_level/0,get_level_integer/0, set_level/1]).
+-export([init/1, handle_event/2, terminate/2, code_change/3, handle_info/2, handle_call/2]).
+
+-define(LEVEL_ERROR, 3).
+-define(LEVEL_INFO, 2).
+-define(LEVEL_DEBUG, 1).
+-define(LEVEL_TMI, 0).
+
+level_integer(error) -> ?LEVEL_ERROR;
+level_integer(info) -> ?LEVEL_INFO;
+level_integer(debug) -> ?LEVEL_DEBUG;
+level_integer(tmi) -> ?LEVEL_TMI;
+level_integer(_Else) -> ?LEVEL_ERROR. % anything else default to ERROR level
+
+level_atom(?LEVEL_ERROR) -> error;
+level_atom(?LEVEL_INFO) -> info;
+level_atom(?LEVEL_DEBUG) -> debug;
+level_atom(?LEVEL_TMI) -> tmi.
+
+
+start_link(Filename, Level) ->
+ couch_event_sup:start_link({local, couch_log}, error_logger, couch_log, {Filename, Level}).
+
+stop() ->
+ couch_event_sup:stop(couch_log).
+
+init({Filename, Level}) ->
+ {ok, Fd} = file:open(Filename, [append]),
+ {ok, {Fd, level_integer(Level)}}.
+
+error(Msg) ->
+ error("~s", [Msg]).
+
+error(Format, Args) ->
+ error_logger:error_report(couch_error, {Format, Args}).
+
+info(Msg) ->
+ info("~s", [Msg]).
+
+info(Format, Args) ->
+ case get_level_integer() =< ?LEVEL_INFO of
+ true ->
+ error_logger:info_report(couch_info, {Format, Args});
+ false ->
+ ok
+ end.
+
+debug(Msg) ->
+ debug("~s", [Msg]).
+
+debug(Format, Args) ->
+ case get_level_integer() =< ?LEVEL_DEBUG of
+ true ->
+ error_logger:info_report(couch_debug, {Format, Args});
+ false ->
+ ok
+ end.
+
+set_level(LevelAtom) ->
+ set_level_integer(level_integer(LevelAtom)).
+
+get_level() ->
+ level_atom(get_level_integer()).
+
+get_level_integer() ->
+ catch gen_event:call(error_logger, couch_log, get_level_integer).
+
+set_level_integer(Int) ->
+ gen_event:call(error_logger, couch_log, {set_level_integer, Int}).
+
+handle_event({error_report, _, {Pid, couch_error, {Format, Args}}}, {Fd, _LogLevel}=State) ->
+ log(Fd, Pid, error, Format, Args),
+ {ok, State};
+handle_event({error_report, _, {Pid, _, _}}=Event, {Fd, _LogLevel}=State) ->
+ log(Fd, Pid, error, "~p", [Event]),
+ {ok, State};
+handle_event({error, _, {Pid, Format, Args}}, {Fd, _LogLevel}=State) ->
+ log(Fd, Pid, error, Format, Args),
+ {ok, State};
+handle_event({info_report, _, {Pid, couch_info, {Format, Args}}}, {Fd, LogLevel}=State)
+when LogLevel =< ?LEVEL_INFO ->
+ log(Fd, Pid, info, Format, Args),
+ {ok, State};
+handle_event({info_report, _, {Pid, couch_debug, {Format, Args}}}, {Fd, LogLevel}=State)
+when LogLevel =< ?LEVEL_DEBUG ->
+ log(Fd, Pid, debug, Format, Args),
+ {ok, State};
+handle_event({_, _, {Pid, _, _}}=Event, {Fd, LogLevel}=State)
+when LogLevel =< ?LEVEL_TMI ->
+ % log every remaining event if tmi!
+ log(Fd, Pid, tmi, "~p", [Event]),
+ {ok, State};
+handle_event(_Event, State) ->
+ {ok, State}.
+
+handle_call(get_level_integer, {_Fd, LogLevel}=State) ->
+ {ok, LogLevel, State};
+handle_call({set_level_integer, NewLevel}, {Fd, _LogLevel}) ->
+ {ok, ok, {Fd, NewLevel}}.
+
+handle_info(_Info, State) ->
+ {ok, State}.
+
+code_change(_OldVsn, State, _Extra) ->
+ {ok, State}.
+
+terminate(_Arg, {Fd, _LoggingLevel}) ->
+ file:close(Fd).
+
+log(Fd, Pid, Level, Format, Args) ->
+ Msg = io_lib:format(Format, Args),
+ ok = io:format("[~s] [~p] ~s~n", [Level, Pid, Msg]), % dump to console too
+ {ok, Msg2, _} = regexp:gsub(lists:flatten(Msg),"\\r\\n|\\r|\\n", "\r\n"),
+ ok = io:format(Fd, "[~s] [~s] [~p] ~s\r~n\r~n", [httpd_util:rfc1123_date(), Level, Pid, Msg2]).
diff --git a/src/couchdb/couch_query_servers.erl b/src/couchdb/couch_query_servers.erl
new file mode 100644
index 00000000..19cba9bd
--- /dev/null
+++ b/src/couchdb/couch_query_servers.erl
@@ -0,0 +1,206 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_query_servers).
+-behaviour(gen_server).
+
+-export([start_link/1]).
+
+-export([init/1, terminate/2, handle_call/3, handle_cast/2, handle_info/2,code_change/3,stop/0]).
+-export([start_doc_map/2, map_docs/2, stop_doc_map/1]).
+
+-export([test/0, test/1]).
+
+-include("couch_db.hrl").
+
+timeout() ->
+ % hardcoded 5 sec timeout per document
+ 5000.
+
+start_link(QueryServerList) ->
+ gen_server:start_link({local, couch_query_servers}, couch_query_servers, QueryServerList, []).
+
+stop() ->
+ exit(whereis(couch_query_servers), close).
+
+readline(Port) ->
+ readline(Port, []).
+
+readline(Port, Acc) ->
+ Timer = erlang:send_after(timeout(), self(), timeout),
+ Result =
+ receive
+ {Port, {data, {noeol, Data}}} ->
+ readline(Port, [Data|Acc]);
+ {Port, {data, {eol, Data}}} ->
+ lists:flatten(lists:reverse(Acc, Data));
+ {Port, Err} ->
+ catch port_close(Port),
+ erlang:cancel_timer(Timer),
+ throw({map_process_error, Err});
+ timeout ->
+ catch port_close(Port),
+ throw({map_process_error, "map function timed out"})
+ end,
+ case erlang:cancel_timer(Timer) of
+ false ->
+ % message already sent. clear it
+ receive timeout -> ok end;
+ _ ->
+ ok
+ end,
+ Result.
+
+read_json(Port) ->
+ case cjson:decode(readline(Port)) of
+ {obj, [{"log", Msg}]} when is_list(Msg) ->
+ % we got a message to log. Log it and continue
+ couch_log:info("Query Server Log Message: ~s", [Msg]),
+ read_json(Port);
+ Else ->
+ Else
+ end.
+
+writeline(Port, String) ->
+ true = port_command(Port, String ++ "\n").
+
+% send command and get a response.
+prompt(Port, Json) ->
+ writeline(Port, cjson:encode(Json)),
+ read_json(Port).
+
+
+start_doc_map(Lang, Functions) ->
+ Port =
+ case gen_server:call(couch_query_servers, {get_port, Lang}) of
+ {ok, Port0} ->
+ link(Port0),
+ Port0;
+ {empty, Cmd} ->
+ couch_log:info("Spawning new ~s instance.", [Lang]),
+ open_port({spawn, Cmd}, [stream,
+ {line, 1000},
+ exit_status,
+ hide]);
+ Error ->
+ throw(Error)
+ end,
+ true = prompt(Port, {"reset"}),
+ % send the functions as json strings
+ lists:foreach(fun(FunctionSource) ->
+ case prompt(Port, {"add_fun", FunctionSource}) of
+ true -> ok;
+ {obj, [{"error", Id}, {"reason", Reason}]} ->
+ throw({Id, Reason})
+ end
+ end,
+ Functions),
+ {ok, {Lang, Port}}.
+
+map_docs({_Lang, Port}, Docs) ->
+ % send the documents
+ Results =
+ lists:map(
+ fun(Doc) ->
+ Json = couch_doc:to_json_obj(Doc, []),
+ case prompt(Port, {"map_doc", Json}) of
+ {obj, [{"error", Id}, {"reason", Reason}]} ->
+ throw({list_to_atom(Id),Reason});
+ {obj, [{"reason", Reason}, {"error", Id}]} ->
+ throw({list_to_atom(Id),Reason});
+ Results when is_tuple(Results) ->
+ % the results are a json array of function map yields like this:
+ % {FunResults1, FunResults2 ...}
+ % where funresults is are json arrays of key value pairs:
+ % {{Key1, Value1}, {Key2, Value2}}
+ % Convert to real lists, execept the key, value pairs
+ [tuple_to_list(FunResult) || FunResult <- tuple_to_list(Results)]
+ end
+ end,
+ Docs),
+ {ok, Results}.
+
+
+stop_doc_map(nil) ->
+ ok;
+stop_doc_map({Lang, Port}) ->
+ ok = gen_server:call(couch_query_servers, {return_port, {Lang, Port}}),
+ true = unlink(Port),
+ ok.
+
+init(QueryServerList) ->
+ {ok, {QueryServerList, []}}.
+
+terminate(_Reason, _Server) ->
+ ok.
+
+
+handle_call({get_port, Lang}, {FromPid, _}, {QueryServerList, LangPorts}) ->
+ case lists:keysearch(Lang, 1, LangPorts) of
+ {value, {_, Port}=LangPort} ->
+ Result =
+ case catch port_connect(Port, FromPid) of
+ true ->
+ true = unlink(Port),
+ {ok, Port};
+ Error ->
+ catch port_close(Port),
+ Error
+ end,
+ {reply, Result, {QueryServerList, LangPorts -- [LangPort]}};
+ false ->
+ case lists:keysearch(Lang, 1, QueryServerList) of
+ {value, {_, ServerCmd}} ->
+ {reply, {empty, ServerCmd}, {QueryServerList, LangPorts}};
+ false -> % not a supported language
+ {reply, {query_language_unknown, Lang}, {QueryServerList, LangPorts}}
+ end
+ end;
+handle_call({return_port, {Lang, Port}}, _From, {QueryServerList, LangPorts}) ->
+ case catch port_connect(Port, self()) of
+ true ->
+ {reply, ok, {QueryServerList, [{Lang, Port} | LangPorts]}};
+ _ ->
+ catch port_close(Port),
+ {reply, ok, {QueryServerList, LangPorts}}
+ end.
+
+handle_cast(_Whatever, {Cmd, Ports}) ->
+ {noreply, {Cmd, Ports}}.
+
+handle_info({Port, {exit_status, Status}}, {QueryServerList, LangPorts}) ->
+ case lists:keysearch(Port, 2, LangPorts) of
+ {value, {Lang, _}} ->
+ case Status of
+ 0 -> ok;
+ _ -> couch_log:error("Abnormal shutdown of ~s query server process (exit_status: ~w).", [Lang, Status])
+ end,
+ {noreply, {QueryServerList, lists:keydelete(Port, 2, LangPorts)}};
+ _ ->
+ couch_log:error("Unknown linked port/process crash: ~p", [Port])
+ end;
+handle_info(_Whatever, {Cmd, Ports}) ->
+ {noreply, {Cmd, Ports}}.
+
+code_change(_OldVsn, State, _Extra) ->
+ {ok, State}.
+
+test() ->
+ test("../js/js -f main.js").
+
+test(Cmd) ->
+ start_link(Cmd),
+ {ok, DocMap} = start_doc_map("javascript", ["function(doc) {if (doc[0] == 'a') return doc[1];}"]),
+ {ok, Results} = map_docs(DocMap, [#doc{body={"a", "b"}}, #doc{body={"c", "d"}},#doc{body={"a", "c"}}]),
+ io:format("Results: ~w~n", [Results]),
+ stop_doc_map(DocMap),
+ ok.
diff --git a/src/couchdb/couch_rep.erl b/src/couchdb/couch_rep.erl
new file mode 100644
index 00000000..9590d5c1
--- /dev/null
+++ b/src/couchdb/couch_rep.erl
@@ -0,0 +1,308 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_rep).
+
+-include("couch_db.hrl").
+
+-export([replicate/2, replicate/3, test/0, test_write_docs/3]).
+
+-record(stats, {
+ docs_read=0,
+ read_errors=0,
+ docs_copied=0,
+ copy_errors=0
+ }).
+
+
+url_encode([H|T]) ->
+ if
+ H >= $a, $z >= H ->
+ [H|url_encode(T)];
+ H >= $A, $Z >= H ->
+ [H|url_encode(T)];
+ H >= $0, $9 >= H ->
+ [H|url_encode(T)];
+ H == $_; H == $.; H == $-; H == $: ->
+ [H|url_encode(T)];
+ true ->
+ case lists:flatten(io_lib:format("~.16.0B", [H])) of
+ [X, Y] ->
+ [$%, X, Y | url_encode(T)];
+ [X] ->
+ [$%, $0, X | url_encode(T)]
+ end
+ end;
+url_encode([]) ->
+ [].
+
+
+replicate(DbNameA, DbNameB) ->
+ replicate(DbNameA, DbNameB, []).
+
+replicate(Source, Target, Options) ->
+ {ok, DbSrc} = open_db(Source),
+ {ok, DbTgt} = open_db(Target),
+ {ok, HostName} = inet:gethostname(),
+
+ RepRecKey = ?LOCAL_DOC_PREFIX ++ HostName ++ ":" ++ Source ++ ":" ++ Target,
+ StartTime = httpd_util:rfc1123_date(),
+ RepRecSrc =
+ case open_doc(DbSrc, RepRecKey, []) of
+ {ok, SrcDoc} -> SrcDoc;
+ _ -> #doc{id=RepRecKey}
+ end,
+
+ RepRecTgt =
+ case open_doc(DbTgt, RepRecKey, []) of
+ {ok, TgtDoc} -> TgtDoc;
+ _ -> #doc{id=RepRecKey}
+ end,
+
+ #doc{body={obj,OldRepHistoryProps}} = RepRecSrc,
+ #doc{body={obj,OldRepHistoryPropsTrg}} = RepRecTgt,
+
+ SeqNum0 =
+ case OldRepHistoryProps == OldRepHistoryPropsTrg of
+ true ->
+ % if the records are identical, then we have a valid replication history
+ proplists:get_value("source_last_seq", OldRepHistoryProps, 0);
+ false ->
+ 0
+ end,
+
+ SeqNum =
+ case proplists:get_value(full, Options, false)
+ orelse proplists:get_value("full", Options, false) of
+ true -> 0;
+ false -> SeqNum0
+ end,
+
+ {NewSeqNum, Stats} = pull_rep(DbTgt, DbSrc, SeqNum, #stats{}),
+ case NewSeqNum == SeqNum andalso OldRepHistoryProps /= [] of
+ true ->
+ % nothing changed, don't record results
+ {ok, {obj, OldRepHistoryProps}};
+ false ->
+ HistEntries =[
+ {obj,
+ [{"start_time", StartTime},
+ {"end_time", httpd_util:rfc1123_date()},
+ {"start_last_seq", SeqNum},
+ {"end_last_seq", NewSeqNum},
+ {"docs_read", Stats#stats.docs_read},
+ {"read_errors", Stats#stats.read_errors},
+ {"docs_copied", Stats#stats.docs_copied},
+ {"copy_errors", Stats#stats.copy_errors}]}
+ | tuple_to_list(proplists:get_value("history", OldRepHistoryProps, {}))],
+ % something changed, record results
+ NewRepHistory =
+ {obj,
+ [{"session_id", couch_util:new_uuid()},
+ {"source_last_seq", NewSeqNum},
+ {"history", list_to_tuple(lists:sublist(HistEntries, 50))}]},
+
+ {ok, _} = update_doc(DbSrc, RepRecSrc#doc{body=NewRepHistory}, []),
+ {ok, _} = update_doc(DbTgt, RepRecTgt#doc{body=NewRepHistory}, []),
+ {ok, NewRepHistory}
+ end.
+
+pull_rep(DbTarget, DbSource, SourceSeqNum, Stats) ->
+ {ok, NewSeq} =
+ enum_docs_since(DbSource, SourceSeqNum,
+ fun(#doc_info{update_seq=Seq}=SrcDocInfo, _, {_, AccStats}) ->
+ Stats2 = maybe_save_docs(DbTarget, DbSource, SrcDocInfo, AccStats),
+ {ok, {Seq, Stats2}}
+ end, {SourceSeqNum, Stats}),
+ NewSeq.
+
+
+maybe_save_docs(DbTarget, DbSource,
+ #doc_info{id=Id, rev=Rev, conflict_revs=Conflicts, deleted_conflict_revs=DelConflicts},
+ Stats) ->
+ SrcRevs = [Rev | Conflicts] ++ DelConflicts,
+ {ok, [{Id, MissingRevs}]} = get_missing_revs(DbTarget, [{Id, SrcRevs}]),
+
+ case MissingRevs of
+ [] ->
+ Stats;
+ _Else ->
+ % the 'ok' below validates no unrecoverable errors (like network failure, etc).
+ {ok, DocResults} = open_doc_revs(DbSource, Id, MissingRevs, [latest]),
+
+ Docs = [RevDoc || {ok, RevDoc} <- DocResults], % only match successful loads
+
+ Stats2 = Stats#stats{
+ docs_read=Stats#stats.docs_read + length(Docs),
+ read_errors=Stats#stats.read_errors + length(DocResults) - length(Docs)},
+
+ case Docs of
+ [] ->
+ Stats2;
+ _ ->
+ % the 'ok' below validates no unrecoverable errors (like network failure, etc).
+ ok = save_docs(DbTarget, Docs, []),
+ Stats2#stats{docs_copied=Stats2#stats.docs_copied+length(Docs)}
+ end
+ end.
+
+
+do_http_request(Url, Action) ->
+ do_http_request(Url, Action, []).
+
+do_http_request(Url, Action, JsonBody) ->
+ couch_log:debug("couch_rep HTTP client request:"),
+ couch_log:debug("\tAction: ~p", [Action]),
+ couch_log:debug("\tUrl: ~p", [Url]),
+ Request =
+ case JsonBody of
+ [] ->
+ {Url, []};
+ _ ->
+ {Url, [], "application/json; charset=utf-8", lists:flatten(cjson:encode(JsonBody))}
+ end,
+ {ok, {{_, ResponseCode,_},_Headers, ResponseBody}} = http:request(Action, Request, [], []),
+ if
+ ResponseCode >= 200, ResponseCode < 500 ->
+ cjson:decode(ResponseBody)
+ end.
+
+enum_docs0(_InFun, [], Acc) ->
+ Acc;
+enum_docs0(InFun, [DocInfo | Rest], Acc) ->
+ case InFun(DocInfo, 0, Acc) of
+ {ok, Acc2} -> enum_docs0(InFun, Rest, Acc2);
+ {stop, Acc2} -> Acc2
+ end.
+
+open_db("http" ++ DbName)->
+ case lists:last(DbName) of
+ $/ ->
+ {ok, "http" ++ DbName};
+ _ ->
+ {ok, "http" ++ DbName ++ "/"}
+ end;
+open_db(DbName)->
+ couch_server:open(DbName).
+
+
+enum_docs_since(DbUrl, StartSeq, InFun, InAcc) when is_list(DbUrl) ->
+ Url = DbUrl ++ "_all_docs_by_seq?startkey=" ++ integer_to_list(StartSeq),
+ {obj, Results} = do_http_request(Url, get),
+ DocInfoList=
+ lists:map(fun({obj, RowInfoList}) ->
+ {obj, RowValueProps} = proplists:get_value("value", RowInfoList),
+ #doc_info{
+ id=proplists:get_value("id", RowInfoList),
+ rev=proplists:get_value("rev", RowValueProps),
+ update_seq = proplists:get_value("key", RowInfoList),
+ conflict_revs =
+ tuple_to_list(proplists:get_value("conflicts", RowValueProps, {})),
+ deleted_conflict_revs =
+ tuple_to_list(proplists:get_value("deleted_conflicts", RowValueProps, {})),
+ deleted = proplists:get_value("deleted", RowValueProps, false)}
+ end, tuple_to_list(proplists:get_value("rows", Results))),
+ {ok, enum_docs0(InFun, DocInfoList, InAcc)};
+enum_docs_since(DbSource, StartSeq, Fun, Acc) ->
+ couch_db:enum_docs_since(DbSource, StartSeq, Fun, Acc).
+
+get_missing_revs(DbUrl, DocIdRevsList) when is_list(DbUrl) ->
+ JsonDocIdRevsList = {obj,
+ [{Id, list_to_tuple(RevList)} || {Id, RevList} <- DocIdRevsList]},
+ {obj, ResponseMembers} =
+ do_http_request(DbUrl ++ "_missing_revs",
+ post, JsonDocIdRevsList),
+ {obj, DocMissingRevsList} = proplists:get_value("missing_revs", ResponseMembers),
+ {ok, [{Id, tuple_to_list(MissingRevs)} || {Id, MissingRevs} <- DocMissingRevsList]};
+get_missing_revs(Db, DocId) ->
+ couch_db:get_missing_revs(Db, DocId).
+
+
+update_doc(DbUrl, #doc{id=DocId}=Doc, _Options) when is_list(DbUrl) ->
+ Url = DbUrl ++ url_encode(DocId),
+ {obj, ResponseMembers} =
+ do_http_request(Url, put, couch_doc:to_json_obj(Doc, [revs,attachments])),
+ RevId = proplists:get_value("_rev", ResponseMembers),
+ {ok, RevId};
+update_doc(Db, Doc, Options) ->
+ couch_db:update_doc(Db, Doc, Options).
+
+save_docs(_, [], _) ->
+ ok;
+save_docs(DbUrl, Docs, []) when is_list(DbUrl) ->
+ JsonDocs = [couch_doc:to_json_obj(Doc, [revs,attachments]) || Doc <- Docs],
+ {obj, Returned} =
+ do_http_request(DbUrl ++ "_bulk_docs", post, {obj, [{new_edits, false}, {docs, list_to_tuple(JsonDocs)}]}),
+ true = proplists:get_value("ok", Returned),
+ ok;
+save_docs(Db, Docs, Options) ->
+ couch_db:save_docs(Db, Docs, Options).
+
+
+open_doc(DbUrl, DocId, []) when is_list(DbUrl) ->
+ case do_http_request(DbUrl ++ url_encode(DocId), get) of
+ {obj, [{"error", ErrId}, {"reason", Reason}]} ->
+ {list_to_atom(ErrId), Reason};
+ Doc ->
+ {ok, couch_doc:from_json_obj(Doc)}
+ end;
+open_doc(Db, DocId, Options) when not is_list(Db) ->
+ couch_db:open_doc(Db, DocId, Options).
+
+
+open_doc_revs(DbUrl, DocId, Revs, Options) when is_list(DbUrl) ->
+ QueryOptionStrs =
+ lists:map(fun(latest) ->
+ % latest is only option right now
+ "latest=true"
+ end, Options),
+ RevsQueryStrs = lists:flatten(cjson:encode(list_to_tuple(Revs))),
+ Url = DbUrl ++ DocId ++ "?" ++ couch_util:implode(["revs=true", "attachments=true", "open_revs=" ++ RevsQueryStrs ] ++ QueryOptionStrs, "&"),
+ JsonResults = do_http_request(Url, get, []),
+ Results =
+ lists:map(
+ fun({obj, [{"missing", Rev}]}) ->
+ {{not_found, missing}, Rev};
+ ({obj, [{"ok", JsonDoc}]}) ->
+ {ok, couch_doc:from_json_obj(JsonDoc)}
+ end, tuple_to_list(JsonResults)),
+ {ok, Results};
+open_doc_revs(Db, DocId, Revs, Options) ->
+ couch_db:open_doc_revs(Db, DocId, Revs, Options).
+
+
+
+
+
+test() ->
+ couch_server:start(),
+ %{ok, LocalA} = couch_server:open("replica_a"),
+ {ok, LocalA} = couch_server:create("replica_a", [overwrite]),
+ {ok, _} = couch_server:create("replica_b", [overwrite]),
+ %DbA = "replica_a",
+ DbA = "http://localhost:5984/replica_a/",
+ %DbB = "replica_b",
+ DbB = "http://localhost:5984/replica_b/",
+ _DocUnids = test_write_docs(10, LocalA, []),
+ replicate(DbA, DbB),
+ %{ok, _Rev} = couch_db:delete_doc(LocalA, lists:nth(1, DocUnids), any),
+ % replicate(DbA, DbB),
+ ok.
+
+test_write_docs(0, _Db, Output) ->
+ lists:reverse(Output);
+test_write_docs(N, Db, Output) ->
+ Doc = #doc{
+ id=integer_to_list(N),
+ body={obj, [{"foo", integer_to_list(N)}, {"num", N}, {"bar", "blah"}]}},
+ couch_db:save_doc(Db, Doc, []),
+ test_write_docs(N-1, Db, [integer_to_list(N) | Output]).
diff --git a/src/couchdb/couch_server.erl b/src/couchdb/couch_server.erl
new file mode 100644
index 00000000..bb3617b2
--- /dev/null
+++ b/src/couchdb/couch_server.erl
@@ -0,0 +1,215 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_server).
+-behaviour(gen_server).
+-behaviour(application).
+
+-export([start/0,start/1,start/2,stop/0,stop/1]).
+-export([open/1,create/2,delete/1,all_databases/0,get_version/0]).
+-export([init/1, handle_call/3,sup_start_link/2]).
+-export([handle_cast/2,code_change/3,handle_info/2,terminate/2]).
+-export([dev_start/0,remote_restart/0]).
+
+-include("couch_db.hrl").
+
+-record(server,{
+ root_dir = [],
+ dbname_regexp,
+ options=[]
+ }).
+
+start() ->
+ start("").
+
+start(IniFile) when is_atom(IniFile) ->
+ couch_server_sup:start_link(atom_to_list(IniFile) ++ ".ini");
+start(IniNum) when is_integer(IniNum) ->
+ couch_server_sup:start_link("couch" ++ integer_to_list(IniNum) ++ ".ini");
+start(IniFile) ->
+ couch_server_sup:start_link(IniFile).
+
+start(_Type, _Args) ->
+ start().
+
+stop() ->
+ couch_server_sup:stop().
+
+stop(_Reason) ->
+ stop().
+
+dev_start() ->
+ stop(),
+ up_to_date = make:all([load, debug_info]),
+ start().
+
+get_version() ->
+ Apps = application:loaded_applications(),
+ case lists:keysearch(couch, 1, Apps) of
+ {value, {_, _, Vsn}} ->
+ Vsn;
+ false ->
+ "0.0.0"
+ end.
+
+sup_start_link(RootDir, Options) ->
+ gen_server:start_link({local, couch_server}, couch_server, {RootDir, Options}, []).
+
+open(Filename) ->
+ gen_server:call(couch_server, {open, Filename}).
+
+create(Filename, Options) ->
+ gen_server:call(couch_server, {create, Filename, Options}).
+
+delete(Filename) ->
+ gen_server:call(couch_server, {delete, Filename}).
+
+remote_restart() ->
+ gen_server:call(couch_server, remote_restart).
+
+init({RootDir, Options}) ->
+ {ok, RegExp} = regexp:parse("^[a-z][a-z0-9\\_\\$()\\+\\-\\/]*$"),
+ {ok, #server{root_dir=RootDir, dbname_regexp=RegExp, options=Options}}.
+
+check_filename(#server{dbname_regexp=RegExp}, Filename) ->
+ case regexp:match(Filename, RegExp) of
+ nomatch ->
+ {error, illegal_database_name};
+ _Match ->
+ ok
+ end.
+
+get_full_filename(Server, Filename) ->
+ filename:join([Server#server.root_dir, "./" ++ Filename ++ ".couch"]).
+
+
+terminate(_Reason, _Server) ->
+ ok.
+
+all_databases() ->
+ {ok, Root} = gen_server:call(couch_server, get_root),
+ Filenames =
+ filelib:fold_files(Root, "^[a-z0-9\\_\\$()\\+\\-]*[\\.]couch$", true,
+ fun(Filename, AccIn) ->
+ case Filename -- Root of
+ [$/ | RelativeFilename] -> ok;
+ RelativeFilename -> ok
+ end,
+ [filename:rootname(RelativeFilename, ".couch") | AccIn]
+ end, []),
+ {ok, Filenames}.
+
+
+handle_call(get_root, _From, #server{root_dir=Root}=Server) ->
+ {reply, {ok, Root}, Server};
+handle_call({open, Filename}, From, Server) ->
+ case check_filename(Server, Filename) of
+ {error, Error} ->
+ {reply, {error, Error}, Server};
+ ok ->
+ Filepath = get_full_filename(Server, Filename),
+ Result = supervisor:start_child(couch_server_sup,
+ {Filename,
+ {couch_db, open, [Filename, Filepath]},
+ transient ,
+ infinity,
+ supervisor,
+ [couch_db]}),
+ case Result of
+ {ok, Db} ->
+ {reply, {ok, Db}, Server};
+ {error, already_present} ->
+ ok = supervisor:delete_child(couch_server_sup, Filename),
+ % call self recursively
+ handle_call({open, Filename}, From, Server);
+ {error, {already_started, Db}} ->
+ {reply, {ok, Db}, Server};
+ {error, {not_found, _}} ->
+ {reply, not_found, Server};
+ {error, {Error, _}} ->
+ {reply, {error, Error}, Server}
+ end
+ end;
+handle_call({create, Filename, Options}, _From, Server) ->
+ case check_filename(Server, Filename) of
+ {error, Error} ->
+ {reply, {error, Error}, Server};
+ ok ->
+ Filepath = get_full_filename(Server, Filename),
+ ChildSpec = {Filename,
+ {couch_db, create, [Filename, Filepath, Options]},
+ transient,
+ infinity,
+ supervisor,
+ [couch_db]},
+ Result =
+ case supervisor:delete_child(couch_server_sup, Filename) of
+ ok ->
+ sup_start_child(couch_server_sup, ChildSpec);
+ {error, not_found} ->
+ sup_start_child(couch_server_sup, ChildSpec);
+ {error, running} ->
+ % a server process for this database already started. Maybe kill it
+ case lists:member(overwrite, Options) of
+ true ->
+ supervisor:terminate_child(couch_server_sup, Filename),
+ ok = supervisor:delete_child(couch_server_sup, Filename),
+ sup_start_child(couch_server_sup, ChildSpec);
+ false ->
+ {error, database_already_exists}
+ end
+ end,
+ case Result of
+ {ok, _Db} -> couch_db_update_notifier:notify({created, Filename});
+ _ -> ok
+ end,
+ {reply, Result, Server}
+ end;
+handle_call({delete, Filename}, _From, Server) ->
+ FullFilepath = get_full_filename(Server, Filename),
+ supervisor:terminate_child(couch_server_sup, Filename),
+ supervisor:delete_child(couch_server_sup, Filename),
+ case file:delete(FullFilepath) of
+ ok ->
+ couch_db_update_notifier:notify({deleted, Filename}),
+ {reply, ok, Server};
+ {error, enoent} ->
+ {reply, not_found, Server};
+ Else ->
+ {reply, Else, Server}
+ end;
+handle_call(remote_restart, _From, #server{options=Options}=Server) ->
+ case proplists:get_value(remote_restart, Options) of
+ true ->
+ exit(self(), restart);
+ _ ->
+ ok
+ end,
+ {reply, ok, Server}.
+
+% this function is just to strip out the child spec error stuff if hit
+sup_start_child(couch_server_sup, ChildSpec) ->
+ case supervisor:start_child(couch_server_sup, ChildSpec) of
+ {error, {Error, _ChildInfo}} ->
+ {error, Error};
+ Else ->
+ Else
+ end.
+
+handle_cast(_Msg, State) ->
+ {noreply,State}.
+
+code_change(_OldVsn, State, _Extra) ->
+ {ok, State}.
+
+handle_info(_Info, State) ->
+ {noreply, State}.
diff --git a/src/couchdb/couch_server_sup.erl b/src/couchdb/couch_server_sup.erl
new file mode 100644
index 00000000..8b9889e7
--- /dev/null
+++ b/src/couchdb/couch_server_sup.erl
@@ -0,0 +1,185 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_server_sup).
+-behaviour(supervisor).
+
+-define(DEFAULT_INI, "couch.ini").
+
+-export([start_link/1,stop/0]).
+
+%% supervisor callbacks
+-export([init/1]).
+
+start_link(IniFilename) ->
+ case whereis(couch_server_sup) of
+ undefined ->
+ start_server(IniFilename);
+ _Else ->
+ {error, already_started}
+ end.
+
+start_server("") ->
+ % no ini file specified, check the command line args
+ IniFile =
+ case init:get_argument(couchini) of
+ {ok, [CmdLineIniFilename]} ->
+ CmdLineIniFilename;
+ _Else ->
+ ?DEFAULT_INI
+ end,
+ start_server(IniFile);
+start_server(InputIniFilename) ->
+
+ case init:get_argument(pidfile) of
+ {ok, [PidFile]} ->
+ case file:write_file(PidFile, os:getpid()) of
+ ok -> ok;
+ Error -> io:format("Failed to write PID file ~s, error: ~p", [PidFile, Error])
+ end;
+ _ -> ok
+ end,
+
+ {ok, Cwd} = file:get_cwd(),
+ IniFilename = couch_util:abs_pathname(InputIniFilename),
+ IniBin =
+ case file:read_file(IniFilename) of
+ {ok, IniBin0} ->
+ IniBin0;
+ {error, enoent} ->
+ Msg = io_lib:format("Couldn't find server configuration file ~s.", [InputIniFilename]),
+ io:format("~s~n", [Msg]),
+ throw({startup_error, Msg})
+ end,
+ {ok, Ini} = couch_util:parse_ini(binary_to_list(IniBin)),
+
+ ConsoleStartupMsg = proplists:get_value({"Couch", "ConsoleStartupMsg"}, Ini, "Apache CouchDB is starting."),
+ LogLevel = list_to_atom(proplists:get_value({"Couch", "LogLevel"}, Ini, "error")),
+ DbRootDir = proplists:get_value({"Couch", "DbRootDir"}, Ini, "."),
+ HttpConfigFile = proplists:get_value({"Couch", "HttpConfigFile"}, Ini, "couch_httpd.conf"),
+ LogFile = proplists:get_value({"Couch", "LogFile"}, Ini, "couchdb.log"),
+ UtilDriverDir = proplists:get_value({"Couch", "UtilDriverDir"}, Ini, ""),
+ UpdateNotifierExes = proplists:get_all_values({"Couch", "DbUpdateNotificationProcess"}, Ini),
+ FtSearchQueryServer = proplists:get_value({"Couch", "FullTextSearchQueryServer"}, Ini, ""),
+ RemoteRestart = list_to_atom(proplists:get_value({"Couch", "AllowRemoteRestart"}, Ini, "undefined")),
+ ServerOptions = [{remote_restart, RemoteRestart}],
+ QueryServers = [{Lang, QueryExe} || {{"Couch Query Servers", Lang}, QueryExe} <- Ini],
+
+ ChildProcesses =
+ [{couch_log,
+ {couch_log, start_link, [LogFile, LogLevel]},
+ permanent,
+ brutal_kill,
+ worker,
+ [couch_server]},
+ {couch_db_update_event,
+ {gen_event, start_link, [{local, couch_db_update}]},
+ permanent,
+ 1000,
+ supervisor,
+ dynamic},
+ {couch_server,
+ {couch_server, sup_start_link, [DbRootDir, ServerOptions]},
+ permanent,
+ brutal_kill,
+ worker,
+ [couch_server]},
+ {couch_util,
+ {couch_util, start_link, [UtilDriverDir]},
+ permanent,
+ brutal_kill,
+ worker,
+ [couch_util]},
+ {couch_query_servers,
+ {couch_query_servers, start_link, [QueryServers]},
+ permanent,
+ brutal_kill,
+ worker,
+ [couch_query_servers]},
+ {couch_view,
+ {couch_view, start_link, [DbRootDir]},
+ permanent,
+ brutal_kill,
+ worker,
+ [couch_view]},
+ {httpd,
+ {httpd, start_link, [HttpConfigFile]},
+ permanent,
+ 1000,
+ supervisor,
+ [httpd]}
+ ] ++
+ lists:map(fun(UpdateNotifierExe) ->
+ {UpdateNotifierExe,
+ {couch_db_update_notifier, start_link, [UpdateNotifierExe]},
+ permanent,
+ 1000,
+ supervisor,
+ [couch_db_update_notifier]}
+ end, UpdateNotifierExes)
+ ++
+ case FtSearchQueryServer of
+ "" ->
+ [];
+ _ ->
+ [{couch_ft_query,
+ {couch_ft_query, start_link, [FtSearchQueryServer]},
+ permanent,
+ 1000,
+ supervisor,
+ [httpd]}]
+ end,
+
+ io:format("couch ~s (LogLevel=~s)~n", [couch_server:get_version(), LogLevel]),
+ io:format("~s~n", [ConsoleStartupMsg]),
+
+ process_flag(trap_exit, true),
+ StartResult = (catch supervisor:start_link(
+ {local, couch_server_sup}, couch_server_sup, ChildProcesses)),
+
+ ConfigInfo = io_lib:format("Config Info ~s:~n\tCurrentWorkingDir=~s~n" ++
+ "\tDbRootDir=~s~n" ++
+ "\tHttpConfigFile=~s~n" ++
+ "\tLogFile=~s~n" ++
+ "\tUtilDriverDir=~s~n" ++
+ "\tDbUpdateNotificationProcesses=~s~n" ++
+ "\tFullTextSearchQueryServer=~s~n" ++
+ "~s",
+ [IniFilename,
+ Cwd,
+ DbRootDir,
+ HttpConfigFile,
+ LogFile,
+ UtilDriverDir,
+ UpdateNotifierExes,
+ FtSearchQueryServer,
+ [lists:flatten(io_lib:format("\t~s=~s~n", [Lang, QueryExe])) || {Lang, QueryExe} <- QueryServers]]),
+ couch_log:debug("~s", [ConfigInfo]),
+
+ case StartResult of
+ {ok,_} ->
+ % only output when startup was successful
+ io:format("Apache CouchDB has started. Time to relax.~n");
+ _ ->
+ % Since we failed startup, unconditionally dump configuration data to console
+ io:format("~s", [ConfigInfo]),
+ ok
+ end,
+ process_flag(trap_exit, false),
+ StartResult.
+
+stop() ->
+ catch exit(whereis(couch_server_sup), normal),
+ couch_log:stop().
+
+init(ChildProcesses) ->
+ {ok, {{one_for_one, 10, 3600}, ChildProcesses}}.
diff --git a/src/couchdb/couch_stream.erl b/src/couchdb/couch_stream.erl
new file mode 100644
index 00000000..d5157b4d
--- /dev/null
+++ b/src/couchdb/couch_stream.erl
@@ -0,0 +1,252 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_stream).
+-behaviour(gen_server).
+
+-export([test/1]).
+-export([open/1, open/2, close/1, read/3, read_term/2, write/2, write_term/2, get_state/1, foldl/5]).
+-export([copy/4]).
+-export([ensure_buffer/2, set_min_buffer/2]).
+-export([init/1, terminate/2, handle_call/3]).
+-export([handle_cast/2,code_change/3,handle_info/2]).
+
+-include("couch_db.hrl").
+
+-define(FILE_POINTER_BYTES, 8).
+-define(FILE_POINTER_BITS, 8*(?FILE_POINTER_BYTES)).
+
+-define(STREAM_OFFSET_BYTES, 4).
+-define(STREAM_OFFSET_BITS, 8*(?STREAM_OFFSET_BYTES)).
+
+-define(HUGE_CHUNK, 1000000000). % Huge chuck size when reading all in one go
+
+-define(DEFAULT_STREAM_CHUNK, 16#00100000). % 1 meg chunks when streaming data
+
+
+-record(write_stream,
+ {fd = 0,
+ current_pos = 0,
+ bytes_remaining = 0,
+ next_alloc = 0,
+ min_alloc = 16#00010000
+ }).
+
+-record(stream,
+ {
+ pid,
+ fd
+ }).
+
+
+%%% Interface functions %%%
+
+open(Fd) ->
+ open(nil, Fd).
+
+open(nil, Fd) ->
+ open({0,0}, Fd);
+open(State, Fd) ->
+ {ok, Pid} = gen_server:start_link(couch_stream, {State, Fd}, []),
+ {ok, #stream{pid = Pid, fd = Fd}}.
+
+close(#stream{pid = Pid, fd = _Fd}) ->
+ gen_server:call(Pid, close).
+
+get_state(#stream{pid = Pid, fd = _Fd}) ->
+ gen_server:call(Pid, get_state).
+
+ensure_buffer(#stream{pid = Pid, fd = _Fd}, Bytes) ->
+ gen_server:call(Pid, {ensure_buffer, Bytes}).
+
+set_min_buffer(#stream{pid = Pid, fd = _Fd}, Bytes) ->
+ gen_server:call(Pid, {set_min_buffer, Bytes}).
+
+read(#stream{pid = _Pid, fd = Fd}, Sp, Num) ->
+ read(Fd, Sp, Num);
+read(Fd, Sp, Num) ->
+ {ok, RevBin, Sp2} = stream_data(Fd, Sp, Num, ?HUGE_CHUNK, fun(Bin, Acc) -> {ok, [Bin | Acc]} end, []),
+ Bin = list_to_binary(lists:reverse(RevBin)),
+ {ok, Bin, Sp2}.
+
+copy(#stream{pid = _Pid, fd = Fd}, Sp, Num, DestStream) ->
+ copy(Fd, Sp, Num, DestStream);
+copy(Fd, Sp, Num, DestStream) ->
+ {ok, NewSp, _Sp2} = stream_data(Fd, Sp, Num, ?HUGE_CHUNK,
+ fun(Bin, AccPointer) ->
+ {ok, NewPointer} = write(Bin, DestStream),
+ if AccPointer == null -> NewPointer; true -> AccPointer end
+ end,
+ null),
+ {ok, NewSp}.
+
+foldl(#stream{pid = _Pid, fd = Fd}, Sp, Num, Fun, Acc) ->
+ foldl(Fd, Sp, Num, Fun, Acc);
+foldl(Fd, Sp, Num, Fun, Acc) ->
+ {ok, _Acc, _Sp} = stream_data(Fd, Sp, Num, ?DEFAULT_STREAM_CHUNK, Fun, Acc).
+
+read_term(#stream{pid = _Pid, fd = Fd}, Sp) ->
+ read_term(Fd, Sp);
+read_term(Fd, Sp) ->
+ {ok, <<TermLen:(?STREAM_OFFSET_BITS)>>, Sp2}
+ = read(Fd, Sp, ?STREAM_OFFSET_BYTES),
+ {ok, Bin, _Sp3} = read(Fd, Sp2, TermLen),
+ {ok, binary_to_term(Bin)}.
+
+write_term(Stream, Term) ->
+ Bin = term_to_binary(Term),
+ Size = size(Bin),
+ Bin2 = <<Size:(?STREAM_OFFSET_BITS), Bin/binary>>,
+ write(Stream, Bin2).
+
+write(#stream{}, <<>>) ->
+ {ok, {0,0}};
+write(#stream{pid = Pid}, Bin) when is_binary(Bin) ->
+ gen_server:call(Pid, {write, Bin}).
+
+
+init({{Pos, BytesRemaining}, Fd}) ->
+ {ok, #write_stream
+ {fd = Fd,
+ current_pos = Pos,
+ bytes_remaining = BytesRemaining
+ }}.
+
+terminate(_Reason, _Stream) ->
+ ok.
+
+handle_call(get_state, _From, Stream) ->
+ #write_stream{current_pos = Pos, bytes_remaining = BytesRemaining} = Stream,
+ {reply, {Pos, BytesRemaining}, Stream};
+handle_call({set_min_buffer, MinBuffer}, _From, Stream) ->
+ {reply, ok, Stream#write_stream{min_alloc = MinBuffer}};
+handle_call({ensure_buffer, BufferSizeRequested}, _From, Stream) ->
+ #write_stream{bytes_remaining = BytesRemainingInCurrentBuffer} = Stream,
+ case BytesRemainingInCurrentBuffer < BufferSizeRequested of
+ true -> NextAlloc = BufferSizeRequested - BytesRemainingInCurrentBuffer;
+ false -> NextAlloc = 0 % enough room in current segment
+ end,
+ {reply, ok, Stream#write_stream{next_alloc = NextAlloc}};
+handle_call({write, Bin}, _From, Stream) ->
+ % ensure init is called first so we can get a pointer to the begining of the binary
+ {ok, Sp, Stream2} = write_data(Stream, Bin),
+ {reply, {ok, Sp}, Stream2};
+handle_call(close, _From, Stream) ->
+ #write_stream{current_pos=Pos, bytes_remaining = BytesRemaining} = Stream,
+ {stop, normal, {ok, {Pos, BytesRemaining}}, Stream}.
+
+handle_cast(_Msg, State) ->
+ {noreply,State}.
+
+code_change(_OldVsn, State, _Extra) ->
+ {ok, State}.
+
+handle_info(_Info, State) ->
+ {noreply, State}.
+
+%%% Internal function %%%
+
+stream_data(_Fd, Sp, 0, _MaxChunk, _Fun, Acc) ->
+ {ok, Acc, Sp};
+stream_data(Fd, {Pos, 0}, Num, MaxChunk, Fun, Acc) ->
+ {ok, <<NextPos:(?FILE_POINTER_BITS), NextOffset:(?STREAM_OFFSET_BITS)>>}
+ = couch_file:pread(Fd, Pos, ?FILE_POINTER_BYTES + ?STREAM_OFFSET_BYTES),
+ Sp = {NextPos, NextOffset},
+ % Check NextPos is past current Pos (this is always true in a stream)
+ % Guards against potential infinite loops caused by corruption.
+ case NextPos > Pos of
+ true -> ok;
+ false -> throw({error, stream_corruption})
+ end,
+ stream_data(Fd, Sp, Num, MaxChunk, Fun, Acc);
+stream_data(Fd, {Pos, Offset}, Num, MaxChunk, Fun, Acc) ->
+ ReadAmount = lists:min([MaxChunk, Num, Offset]),
+ {ok, Bin} = couch_file:pread(Fd, Pos, ReadAmount),
+ Sp = {Pos + ReadAmount, Offset - ReadAmount},
+ case Fun(Bin, Acc) of
+ {ok, Acc2} ->
+ stream_data(Fd, Sp, Num - ReadAmount, MaxChunk, Fun, Acc2);
+ {stop, Acc2} ->
+ {ok, Acc2, Sp}
+ end.
+
+write_data(Stream, <<>>) ->
+ {ok, {0,0}, Stream};
+write_data(#write_stream{bytes_remaining=0} = Stream, Bin) ->
+ #write_stream {
+ fd = Fd,
+ current_pos = CurrentPos,
+ next_alloc = NextAlloc,
+ min_alloc = MinAlloc
+ }= Stream,
+
+ NewSize = lists:max([MinAlloc, NextAlloc, size(Bin)]),
+ % no space in the current segment, must alloc a new segment
+ {ok, NewPos} = couch_file:expand(Fd, NewSize + ?FILE_POINTER_BYTES + ?STREAM_OFFSET_BYTES),
+
+ case CurrentPos of
+ 0 ->
+ ok;
+ _ ->
+ ok = couch_file:pwrite(Fd, CurrentPos, <<NewPos:(?FILE_POINTER_BITS), NewSize:(?STREAM_OFFSET_BITS)>>)
+ end,
+ Stream2 = Stream#write_stream{
+ current_pos=NewPos,
+ bytes_remaining=NewSize,
+ next_alloc=0},
+ write_data(Stream2, Bin);
+write_data(#write_stream{fd=Fd, current_pos=Pos, bytes_remaining=BytesRemaining} = Stream, Bin) ->
+ BytesToWrite = lists:min([size(Bin), BytesRemaining]),
+ {WriteBin, Rest} = split_binary(Bin, BytesToWrite),
+ ok = couch_file:pwrite(Fd, Pos, WriteBin),
+ Stream2 = Stream#write_stream{
+ bytes_remaining=BytesRemaining - BytesToWrite,
+ current_pos=Pos + BytesToWrite
+ },
+ {ok, _, Stream3} = write_data(Stream2, Rest),
+ {ok, {Pos, BytesRemaining}, Stream3}.
+
+
+
+%%% Tests %%%
+
+
+test(Term) ->
+ {ok, Fd} = couch_file:open("foo", [write]),
+ {ok, Stream} = open({0,0}, Fd),
+ {ok, Pos} = write_term(Stream, Term),
+ {ok, Pos2} = write_term(Stream, {Term, Term}),
+ close(Stream),
+ couch_file:close(Fd),
+ {ok, Fd2} = couch_file:open("foo", [read, write]),
+ {ok, Stream2} = open({0,0}, Fd2),
+ {ok, Term1} = read_term(Fd2, Pos),
+ io:format("Term1: ~w ~n",[Term1]),
+ {ok, Term2} = read_term(Fd2, Pos2),
+ io:format("Term2: ~w ~n",[Term2]),
+ {ok, PointerList} = deep_write_test(Stream2, Term, 1000, []),
+ deep_read_test(Fd2, PointerList),
+ close(Stream2),
+ couch_file:close(Fd2).
+
+deep_read_test(_Fd, []) ->
+ ok;
+deep_read_test(Fd, [Pointer | RestPointerList]) ->
+ {ok, _Term} = read_term(Fd, Pointer),
+ deep_read_test(Fd, RestPointerList).
+
+deep_write_test(_Stream, _Term, 0, PointerList) ->
+ {ok, PointerList};
+deep_write_test(Stream, Term, N, PointerList) ->
+ WriteList = lists:duplicate(random:uniform(N), Term),
+ {ok, Pointer} = write_term(Stream, WriteList),
+ deep_write_test(Stream, Term, N-1, [Pointer | PointerList]).
diff --git a/src/couchdb/couch_util.erl b/src/couchdb/couch_util.erl
new file mode 100644
index 00000000..42845fe0
--- /dev/null
+++ b/src/couchdb/couch_util.erl
@@ -0,0 +1,316 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_util).
+-behaviour(gen_server).
+
+-export([start_link/0,start_link/1]).
+-export([parse_ini/1]).
+-export([new_uuid/0, rand32/0, implode/2, collate/2, collate/3]).
+-export([abs_pathname/1,abs_pathname/2, trim/1, ascii_lower/1, test/0]).
+-export([encodeBase64/1, decodeBase64/1]).
+
+-export([init/1, terminate/2, handle_call/3]).
+-export([handle_cast/2,code_change/3,handle_info/2]).
+
+
+start_link() ->
+ start_link("").
+
+start_link("") ->
+ start_link(filename:join(code:priv_dir(couch), "lib"));
+start_link(LibDir) ->
+ case erl_ddll:load_driver(LibDir, "couch_erl_driver") of
+ ok -> ok;
+ {error, already_loaded} -> ok;
+ {error, ErrorDesc} -> exit({error, ErrorDesc})
+ end,
+ gen_server:start_link({local, couch_util}, couch_util, [], []).
+
+
+new_uuid() ->
+ gen_server:call(couch_util, new_uuid).
+
+% returns a random integer
+rand32() ->
+ gen_server:call(couch_util, rand32).
+
+% given a pathname "../foo/bar/" it gives back the fully qualified
+% absolute pathname.
+abs_pathname(" " ++ Filename) ->
+ % strip leading whitspace
+ abs_pathname(Filename);
+abs_pathname([$/ |_]=Filename) ->
+ Filename;
+abs_pathname(Filename) ->
+ {ok, Cwd} = file:get_cwd(),
+ {Filename2, Args} = separate_cmd_args(Filename, ""),
+ abs_pathname(Filename2, Cwd) ++ Args.
+
+abs_pathname(Filename, Dir) ->
+ Name = filename:absname(Filename, Dir ++ "/"),
+ OutFilename = filename:join(fix_path_list(filename:split(Name), [])),
+ % If the filename is a dir (last char slash, put back end slash
+ case string:right(Filename,1) of
+ "/" ->
+ OutFilename ++ "/";
+ "\\" ->
+ OutFilename ++ "/";
+ _Else->
+ OutFilename
+ end.
+
+% if this as an executable with arguments, seperate out the arguments
+% ""./foo\ bar.sh -baz=blah" -> {"./foo\ bar.sh", " -baz=blah"}
+separate_cmd_args("", CmdAcc) ->
+ {lists:reverse(CmdAcc), ""};
+separate_cmd_args("\\ " ++ Rest, CmdAcc) -> % handle skipped value
+ separate_cmd_args(Rest, " \\" ++ CmdAcc);
+separate_cmd_args(" " ++ Rest, CmdAcc) ->
+ {lists:reverse(CmdAcc), " " ++ Rest};
+separate_cmd_args([Char|Rest], CmdAcc) ->
+ separate_cmd_args(Rest, [Char | CmdAcc]).
+
+% lowercases string bytes that are the ascii characters A-Z.
+% All other characters/bytes are ignored.
+ascii_lower(String) ->
+ ascii_lower(String, []).
+
+ascii_lower([], Acc) ->
+ lists:reverse(Acc);
+ascii_lower([Char | RestString], Acc) when Char >= $A, Char =< $B ->
+ ascii_lower(RestString, [Char + ($a-$A) | Acc]);
+ascii_lower([Char | RestString], Acc)->
+ ascii_lower(RestString, [Char | Acc]).
+
+% Is a character whitespace?
+is_whitespace($\s)-> true;
+is_whitespace($\t)-> true;
+is_whitespace($\n)-> true;
+is_whitespace($\r)-> true;
+is_whitespace(_Else) -> false.
+
+
+% removes leading and trailing whitespace from a string
+trim(String) ->
+ String2 = lists:dropwhile(fun is_whitespace/1, String),
+ lists:reverse(lists:dropwhile(fun is_whitespace/1, lists:reverse(String2))).
+
+% takes a heirarchical list of dirs and removes the dots ".", double dots
+% ".." and the corresponding parent dirs.
+fix_path_list([], Acc) ->
+ lists:reverse(Acc);
+fix_path_list([".."|Rest], [_PrevAcc|RestAcc]) ->
+ fix_path_list(Rest, RestAcc);
+fix_path_list(["."|Rest], Acc) ->
+ fix_path_list(Rest, Acc);
+fix_path_list([Dir | Rest], Acc) ->
+ fix_path_list(Rest, [Dir | Acc]).
+
+
+implode(List, Sep) ->
+ implode(List, Sep, []).
+
+implode([], _Sep, Acc) ->
+ lists:flatten(lists:reverse(Acc));
+implode([H], Sep, Acc) ->
+ implode([], Sep, [H|Acc]);
+implode([H|T], Sep, Acc) ->
+ implode(T, Sep, [Sep,H|Acc]).
+
+
+% This is a simple ini parser. it just converts the string
+% contents of a file like this:
+%
+%; comments are ignored
+%;commentedoutvariable=foo
+%this is line that gets ignored because it has no equals sign
+%[this line gets ignored because it starts with a bracket but doesn't end with one
+%bloodtype=Ragu
+%[Some Section]
+%timeout=30
+%Default=zuh ; another comment (leading space or tab before a semi is necessary to be a comment if not at beginning of line)
+%[Another Section]
+%key with spaces=a value with stuff; and no comment
+%oops="it doesn't qet quoted strings with semis quite right ; it thinks it's part comment"
+%
+%And converts it into this:
+%[{{"","bloodtype"},"Ragu"},
+% {{"Some Section","timeout"},"30"},
+% {{"Some section","Default"}, "zuh"},
+% {{"Another Section", "key with spaces"}, "a value with stuff; and no comment"},
+% {{"Another Section", "oops"}, "\"it doesn't qet quoted strings with semis quite right"}]
+%
+
+parse_ini(FileContents) ->
+ {ok, Lines} = regexp:split(FileContents, "\r\n|\n|\r|\032"),
+ {_, ParsedIniValues} =
+ lists:foldl(fun(Line, {AccSectionName, AccValues}) ->
+ case string:strip(Line) of
+ "[" ++ Rest ->
+ case regexp:split(Rest, "\\]") of
+ {ok, [NewSectionName, ""]} ->
+ {NewSectionName, AccValues};
+ _Else -> % end bracket not at end, ignore this line
+ {AccSectionName, AccValues}
+ end;
+ ";" ++ _Comment ->
+ {AccSectionName, AccValues};
+ Line2 ->
+ case regexp:split(Line2, "=") of
+ {ok, [_SingleElement]} -> % no "=" found, ignore this line
+ {AccSectionName, AccValues};
+ {ok, [""|_LineValues]} -> % line begins with "=", ignore
+ {AccSectionName, AccValues};
+ {ok, [ValueName|LineValues]} -> % yeehaw, got a line!
+ RemainingLine = implode(LineValues, "="),
+ {ok, [LineValue | _Rest]} = regexp:split(RemainingLine, " ;|\t;"), % removes comments
+ {AccSectionName, [{{AccSectionName, ValueName}, LineValue} | AccValues]}
+ end
+ end
+ end, {"", []}, Lines),
+ {ok, lists:reverse(ParsedIniValues)}.
+
+init([]) ->
+ {A,B,C} = erlang:now(),
+ random:seed(A,B,C),
+ {ok, dummy_server}.
+
+terminate(_Reason, _Server) ->
+ ok.
+
+handle_call(new_uuid, _From, Server) ->
+ {reply, new_uuid_int(), Server};
+handle_call(rand32, _From, Server) ->
+ {reply, rand32_int(), Server}.
+
+handle_cast(_Msg, State) ->
+ {noreply,State}.
+
+code_change(_OldVsn, State, _Extra) ->
+ {ok, State}.
+
+handle_info(_Info, State) ->
+ {noreply, State}.
+
+
+new_uuid_int() ->
+ % eventually make this a C callout for a real guid (collisions are far less likely
+ % when using a proper generation function). For now we just fake it.
+ Num1 = random:uniform(16#FFFFFFFF + 1) - 1,
+ Num2 = random:uniform(16#FFFFFFFF + 1) - 1,
+ Num3 = random:uniform(16#FFFFFFFF + 1) - 1,
+ Num4 = random:uniform(16#FFFFFFFF + 1) - 1,
+ lists:flatten(io_lib:format("~8.16.0B~8.16.0B~8.16.0B~8.16.0B", [Num1, Num2, Num3, Num4])).
+
+
+
+rand32_int() ->
+ random:uniform(16#FFFFFFFF + 1) - 1.
+
+drv_port() ->
+ case get(couch_drv_port) of
+ undefined ->
+ Port = open_port({spawn, "couch_erl_driver"}, []),
+ put(couch_drv_port, Port),
+ Port;
+ Port ->
+ Port
+ end.
+
+collate(A, B) ->
+ collate(A, B, []).
+
+collate(A, B, Options) when is_list(A), is_list(B) ->
+ Operation =
+ case lists:member(nocase, Options) of
+ true -> 1; % Case insensitive
+ false -> 0 % Case sensitive
+ end,
+ Port = drv_port(),
+ LenA = length(A),
+ LenB = length(B),
+ Bin = list_to_binary([<<LenA:32/native>>, A, <<LenB:32/native>>, B]),
+ case erlang:port_control(Port, Operation, Bin) of
+ [0] -> -1;
+ [1] -> 1;
+ [2] -> 0
+ end.
+
+
+
+
+%%% Purpose : Base 64 encoding and decoding.
+%%% Copied from ssl_base_64 to avoid using the
+%%% erlang ssl library
+
+-define(st(X,A), ((X-A+256) div 256)).
+-define(CHARS, 64).
+
+%% A PEM encoding consists of characters A-Z, a-z, 0-9, +, / and
+%% =. Each character encodes a 6 bits value from 0 to 63 (A = 0, / =
+%% 63); = is a padding character.
+%%
+
+%%
+%% encode64(Bytes|Binary) -> Chars
+%%
+%% Take 3 bytes a time (3 x 8 = 24 bits), and make 4 characters out of
+%% them (4 x 6 = 24 bits).
+%%
+encodeBase64(Bs) when list(Bs) ->
+ encodeBase64(list_to_binary(Bs));
+encodeBase64(<<B:3/binary, Bs/binary>>) ->
+ <<C1:6, C2:6, C3:6, C4:6>> = B,
+ [enc(C1), enc(C2), enc(C3), enc(C4)| encodeBase64(Bs)];
+encodeBase64(<<B:2/binary>>) ->
+ <<C1:6, C2:6, C3:6, _:6>> = <<B/binary, 0>>,
+ [enc(C1), enc(C2), enc(C3), $=];
+encodeBase64(<<B:1/binary>>) ->
+ <<C1:6, C2:6, _:12>> = <<B/binary, 0, 0>>,
+ [enc(C1), enc(C2), $=, $=];
+encodeBase64(<<>>) ->
+ [].
+
+%%
+%% decodeBase64(Chars) -> Binary
+%%
+decodeBase64(Cs) ->
+ list_to_binary(decode1(Cs)).
+
+decode1([C1, C2, $=, $=]) ->
+ <<B1, _:16>> = <<(dec(C1)):6, (dec(C2)):6, 0:12>>,
+ [B1];
+decode1([C1, C2, C3, $=]) ->
+ <<B1, B2, _:8>> = <<(dec(C1)):6, (dec(C2)):6, (dec(C3)):6, (dec(0)):6>>,
+ [B1, B2];
+decode1([C1, C2, C3, C4| Cs]) ->
+ Bin = <<(dec(C1)):6, (dec(C2)):6, (dec(C3)):6, (dec(C4)):6>>,
+ [Bin| decode1(Cs)];
+decode1([]) ->
+ [].
+
+%% enc/1 and dec/1
+%%
+%% Mapping: 0-25 -> A-Z, 26-51 -> a-z, 52-61 -> 0-9, 62 -> +, 63 -> /
+%%
+enc(C) ->
+ 65 + C + 6*?st(C,26) - 75*?st(C,52) -15*?st(C,62) + 3*?st(C,63).
+
+dec(C) ->
+ 62*?st(C,43) + ?st(C,47) + (C-59)*?st(C,48) - 69*?st(C,65) - 6*?st(C,97).
+
+
+
+test() ->
+ start_link("debug"),
+ collate("a","b",[]).
diff --git a/src/couchdb/couch_view.erl b/src/couchdb/couch_view.erl
new file mode 100644
index 00000000..612eb5fd
--- /dev/null
+++ b/src/couchdb/couch_view.erl
@@ -0,0 +1,616 @@
+% Copyright 2007, 2008 Damien Katz <damien_katz@yahoo.com>
+%
+% Licensed under the Apache License, Version 2.0 (the "License");
+% you may not use this file except in compliance with the License.
+% You may obtain a copy of the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS,
+% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+% See the License for the specific language governing permissions and
+% limitations under the License.
+
+-module(couch_view).
+-behaviour(gen_server).
+
+-export([start_link/1,fold/4,fold/5,less_json/2, start_update_loop/3, start_temp_update_loop/4]).
+-export([init/1,terminate/2,handle_call/3,handle_cast/2,handle_info/2,code_change/3]).
+
+-include("couch_db.hrl").
+
+% arbitrarily chosen amount of memory to use before flushing to disk
+-define(FLUSH_MAX_MEM, 10000000).
+
+-record(group,
+ {db,
+ fd,
+ name,
+ def_lang,
+ views,
+ id_btree,
+ current_seq,
+ query_server=nil
+ }).
+
+-record(view,
+ {id_num,
+ name,
+ btree,
+ def
+ }).
+
+-record(server,
+ {root_dir
+ }).
+
+start_link(RootDir) ->
+ gen_server:start_link({local, couch_view}, couch_view, RootDir, []).
+
+
+
+get_temp_updater(DbName, Type, Src) ->
+ {ok, Pid} = gen_server:call(couch_view, {start_temp_updater, DbName, Type, Src}),
+ Pid.
+
+get_updater(DbName, GroupId) ->
+ {ok, Pid} = gen_server:call(couch_view, {start_updater, DbName, GroupId}),
+ Pid.
+
+get_updated_group(Pid) ->
+ Mref = erlang:monitor(process, Pid),
+ receive
+ {'DOWN', Mref, _, _, Reason} ->
+ throw(Reason)
+ after 0 ->
+ Pid ! {self(), get_updated},
+ receive
+ {Pid, Response} ->
+ erlang:demonitor(Mref),
+ receive
+ {'DOWN', Mref, _, _, _} ->
+ Response
+ after 0 ->
+ Response
+ end;
+ {'DOWN', Mref, _, _, Reason} ->
+ throw(Reason)
+ end
+ end.
+
+fold(ViewInfo, Dir, Fun, Acc) ->
+ fold(ViewInfo, nil, Dir, Fun, Acc).
+
+fold({temp, DbName, Type, Src}, StartKey, Dir, Fun, Acc) ->
+ {ok, #group{views=[View]}} = get_updated_group(get_temp_updater(DbName, Type, Src)),
+ fold_view(View#view.btree, StartKey, Dir, Fun, Acc);
+fold({DbName, GroupId, ViewName}, StartKey, Dir, Fun, Acc) ->
+ {ok, #group{views=Views}} = get_updated_group(get_updater(DbName, GroupId)),
+ Btree = get_view_btree(Views, ViewName),
+ fold_view(Btree, StartKey, Dir, Fun, Acc).
+
+fold_view(Btree, StartKey, Dir, Fun, Acc) ->
+ TotalRowCount = couch_btree:row_count(Btree),
+ WrapperFun = fun({{Key, DocId}, Value}, Offset, WrapperAcc) ->
+ Fun(DocId, Key, Value, Offset, TotalRowCount, WrapperAcc)
+ end,
+ {ok, AccResult} = couch_btree:fold(Btree, StartKey, Dir, WrapperFun, Acc),
+ {ok, TotalRowCount, AccResult}.
+
+
+get_view_btree([], _ViewName) ->
+ throw({not_found, missing_named_view});
+get_view_btree([View | _RestViews], ViewName) when View#view.name == ViewName ->
+ View#view.btree;
+get_view_btree([_View | RestViews], ViewName) ->
+ get_view_btree(RestViews, ViewName).
+
+
+init(RootDir) ->
+ UpdateNotifierFun =
+ fun({deleted, DbName}) ->
+ gen_server:cast(couch_view, {reset_indexes, DbName});
+ ({created, DbName}) ->
+ gen_server:cast(couch_view, {reset_indexes, DbName});
+ (_Else) ->
+ ok
+ end,
+ couch_db_update_notifier:start_link(UpdateNotifierFun),
+ ets:new(couch_views_by_db, [bag, private, named_table]),
+ ets:new(couch_views_by_name, [set, protected, named_table]),
+ ets:new(couch_views_by_updater, [set, private, named_table]),
+ ets:new(couch_views_temp_fd_by_db, [set, protected, named_table]),
+ process_flag(trap_exit, true),
+ {ok, #server{root_dir=RootDir}}.
+
+terminate(_Reason, _) ->
+ catch ets:delete(couch_views_by_name),
+ catch ets:delete(couch_views_by_updater),
+ catch ets:delete(couch_views_by_db),
+ catch ets:delete(couch_views_temp_fd_by_db).
+
+
+handle_call({start_temp_updater, DbName, Lang, Query}, _From, #server{root_dir=Root}=Server) ->
+ <<SigInt:128/integer>> = erlang:md5(Lang ++ Query),
+ Name = lists:flatten(io_lib:format("_temp_~.36B",[SigInt])),
+ Pid =
+ case ets:lookup(couch_views_by_name, {DbName, Name}) of
+ [] ->
+ case ets:lookup(couch_views_temp_fd_by_db, DbName) of
+ [] ->
+ FileName = Root ++ "/." ++ DbName ++ "_temp",
+ {ok, Fd} = couch_file:open(FileName, [create, overwrite]),
+ Count = 0;
+ [{_, Fd, Count}] ->
+ ok
+ end,
+ couch_log:debug("Spawning new temp update process for db ~s.", [DbName]),
+ NewPid = spawn_link(couch_view, start_temp_update_loop, [DbName, Fd, Lang, Query]),
+ true = ets:insert(couch_views_temp_fd_by_db, {DbName, Fd, Count + 1}),
+ add_to_ets(NewPid, DbName, Name),
+ NewPid;
+ [{_, ExistingPid0}] ->
+ ExistingPid0
+ end,
+ {reply, {ok, Pid}, Server};
+handle_call({start_updater, DbName, GroupId}, _From, #server{root_dir=Root}=Server) ->
+ Pid =
+ case ets:lookup(couch_views_by_name, {DbName, GroupId}) of
+ [] ->
+ couch_log:debug("Spawning new update process for view group ~s in database ~s.", [GroupId, DbName]),
+ NewPid = spawn_link(couch_view, start_update_loop, [Root, DbName, GroupId]),
+ add_to_ets(NewPid, DbName, GroupId),
+ NewPid;
+ [{_, ExistingPid0}] ->
+ ExistingPid0
+ end,
+ {reply, {ok, Pid}, Server}.
+
+handle_cast({reset_indexes, DbName}, #server{root_dir=Root}=Server) ->
+ % shutdown all the updaters
+ Names = ets:lookup(couch_views_by_db, DbName),
+ lists:foreach(
+ fun({_DbName, GroupId}) ->
+ couch_log:debug("Killing update process for view group ~s. in database ~s.", [GroupId, DbName]),
+ [{_, Pid}] = ets:lookup(couch_views_by_name, {DbName, GroupId}),
+ exit(Pid, kill),
+ receive {'EXIT', Pid, _} ->
+ delete_from_ets(Pid, DbName, GroupId)
+ end
+ end, Names),
+ delete_index_dir(Root, DbName),
+ file:delete(Root ++ "/." ++ DbName ++ "_temp"),
+ {noreply, Server}.
+
+handle_info({'EXIT', FromPid, Reason}, #server{root_dir=RootDir}=Server) ->
+ case ets:lookup(couch_views_by_updater, FromPid) of
+ [] -> % non-updater linked process must have died, we propagate the error
+ exit(Reason);
+ [{_, {DbName, "_temp_" ++ _ = GroupId}}] ->
+ delete_from_ets(FromPid, DbName, GroupId),
+ [{_, Fd, Count}] = ets:lookup(couch_views_temp_fd_by_db, DbName),
+ case Count of
+ 1 -> % Last ref
+ couch_file:close(Fd),
+ file:delete(RootDir ++ "/." ++ DbName ++ "_temp"),
+ true = ets:delete(couch_views_temp_fd_by_db, DbName);
+ _ ->
+ true = ets:insert(couch_views_temp_fd_by_db, {DbName, Fd, Count - 1})
+ end;
+ [{_, {DbName, GroupId}}] ->
+ delete_from_ets(FromPid, DbName, GroupId)
+ end,
+ {noreply, Server}.
+
+add_to_ets(Pid, DbName, GroupId) ->
+ true = ets:insert(couch_views_by_updater, {Pid, {DbName, GroupId}}),
+ true = ets:insert(couch_views_by_name, {{DbName, GroupId}, Pid}),
+ true = ets:insert(couch_views_by_db, {DbName, GroupId}).
+
+delete_from_ets(Pid, DbName, GroupId) ->
+ true = ets:delete(couch_views_by_updater, Pid),
+ true = ets:delete(couch_views_by_name, {DbName, GroupId}),
+ true = ets:delete_object(couch_views_by_db, {DbName, GroupId}).
+
+code_change(_OldVsn, State, _Extra) ->
+ {ok, State}.
+
+start_update_loop(RootDir, DbName, GroupId) ->
+ % wait for a notify request before doing anything. This way, we can just
+ % exit and any exits will be noticed by the callers.
+ start_update_loop(RootDir, DbName, GroupId, get_notify_pids(1000)).
+
+
+start_temp_update_loop(DbName, Fd, Lang, Query) ->
+ NotifyPids = get_notify_pids(1000),
+ case couch_server:open(DbName) of
+ {ok, Db} ->
+ View = #view{name="_temp", id_num=0, btree=nil, def=Query},
+ Group = #group{name="_temp",
+ db=Db,
+ views=[View],
+ current_seq=0,
+ def_lang=Lang,
+ id_btree=nil},
+ Group2 = disk_group_to_mem(Fd, Group),
+ temp_update_loop(Group2, NotifyPids);
+ Else ->
+ exit(Else)
+ end.
+
+temp_update_loop(Group, NotifyPids) ->
+ {ok, Group2} = update_group(Group),
+ [Pid ! {self(), {ok, Group2}} || Pid <- NotifyPids],
+ garbage_collect(),
+ temp_update_loop(Group2, get_notify_pids(100000)).
+
+start_update_loop(RootDir, DbName, GroupId, NotifyPids) ->
+ {Db, DefLang, Defs} =
+ case couch_server:open(DbName) of
+ {ok, Db0} ->
+ case couch_db:open_doc(Db0, GroupId) of
+ {ok, Doc} ->
+ case couch_doc:get_view_functions(Doc) of
+ none ->
+ delete_index_file(RootDir, DbName, GroupId),
+ exit({not_found, no_views_found});
+ {DefLang0, Defs0} ->
+ {Db0, DefLang0, Defs0}
+ end;
+ Else ->
+ delete_index_file(RootDir, DbName, GroupId),
+ exit(Else)
+ end;
+ Else ->
+ delete_index_file(RootDir, DbName, GroupId),
+ exit(Else)
+ end,
+ Group = open_index_file(RootDir, DbName, GroupId, DefLang, Defs),
+
+ try update_loop(Group#group{db=Db}, NotifyPids) of
+ _ -> ok
+ catch
+ restart ->
+ couch_file:close(Group#group.fd),
+ start_update_loop(RootDir, DbName, GroupId, NotifyPids ++ get_notify_pids())
+ end.
+
+update_loop(#group{fd=Fd}=Group, NotifyPids) ->
+ {ok, Group2} = update_group(Group),
+ ok = couch_file:write_header(Fd, <<$r, $c, $k, 0>>, mem_group_to_disk(Group2)),
+ [Pid ! {self(), {ok, Group2}} || Pid <- NotifyPids],
+ garbage_collect(),
+ update_loop(Group2).
+
+update_loop(Group) ->
+ update_loop(Group, get_notify_pids()).
+
+% wait for the first request to come in.
+get_notify_pids(Wait) ->
+ receive
+ {Pid, get_updated} ->
+ [Pid | get_notify_pids()]
+ after Wait ->
+ exit(wait_timeout)
+ end.
+% then keep getting all available and return.
+get_notify_pids() ->
+ receive
+ {Pid, get_updated} ->
+ [Pid | get_notify_pids()]
+ after 0 ->
+ []
+ end.
+
+update_group(#group{db=Db,current_seq=CurrentSeq, views=Views}=Group) ->
+ ViewEmptyKVs = [{View, []} || View <- Views],
+ % compute on all docs modified since we last computed.
+ {ok, {UncomputedDocs, Group2, ViewKVsToAdd, DocIdViewIdKeys, NewSeq}}
+ = couch_db:enum_docs_since(
+ Db,
+ CurrentSeq,
+ fun(DocInfo, _, Acc) -> process_doc(Db, DocInfo, Acc) end,
+ {[], Group, ViewEmptyKVs, [], CurrentSeq}
+ ),
+
+ {Group3, Results} = view_compute(Group2, UncomputedDocs),
+ {ViewKVsToAdd2, DocIdViewIdKeys2} = view_insert_query_results(UncomputedDocs, Results, ViewKVsToAdd, DocIdViewIdKeys),
+ couch_query_servers:stop_doc_map(Group3#group.query_server),
+ if CurrentSeq /= NewSeq ->
+ {ok, Group4} = write_changes(Group3, ViewKVsToAdd2, DocIdViewIdKeys2, NewSeq),
+ {ok, Group4#group{query_server=nil}};
+ true ->
+ {ok, Group3#group{query_server=nil}}
+ end.
+
+delete_index_dir(RootDir, DbName) ->
+ nuke_dir(RootDir ++ "/." ++ DbName ++ "_design").
+
+nuke_dir(Dir) ->
+ case file:list_dir(Dir) of
+ {error, enoent} -> ok; % doesn't exist
+ {ok, Files} ->
+ lists:foreach(
+ fun(File)->
+ Full = Dir ++ "/" ++ File,
+ case file:delete(Full) of
+ ok -> ok;
+ {error, eperm} ->
+ ok = nuke_dir(Full)
+ end
+ end,
+ Files),
+ ok = file:del_dir(Dir)
+ end.
+
+delete_index_file(RootDir, DbName, GroupId) ->
+ file:delete(RootDir ++ "/." ++ DbName ++ GroupId ++ ".view").
+
+open_index_file(RootDir, DbName, GroupId, ViewLang, ViewDefs) ->
+ FileName = RootDir ++ "/." ++ DbName ++ GroupId ++".view",
+ case couch_file:open(FileName) of
+ {ok, Fd} ->
+ case couch_file:read_header(Fd, <<$r, $c, $k, 0>>) of
+ {ok, #group{views=Views}=Group} ->
+ % validate all the view definitions in the index are correct.
+ case same_view_def(Views, ViewDefs) of
+ true -> disk_group_to_mem(Fd, Group);
+ false -> reset_header(GroupId, Fd, ViewLang, ViewDefs)
+ end;
+ _ ->
+ reset_header(GroupId, Fd, ViewLang, ViewDefs)
+ end;
+ _ ->
+ case couch_file:open(FileName, [create]) of
+ {ok, Fd} ->
+ reset_header(GroupId, Fd, ViewLang, ViewDefs);
+ Error ->
+ throw(Error)
+ end
+ end.
+
+same_view_def([], []) ->
+ true;
+same_view_def(DiskViews, ViewDefs) when DiskViews == [] orelse ViewDefs == []->
+ false;
+same_view_def([#view{name=DiskName,def=DiskDef}|RestViews], [{Name, Def}|RestDefs]) ->
+ if DiskName == Name andalso DiskDef == Def ->
+ same_view_def(RestViews, RestDefs);
+ true ->
+ false
+ end.
+
+% Given a disk ready group structure, return an initialized, in-memory version.
+disk_group_to_mem(Fd, #group{id_btree=IdState,views=Views}=Group) ->
+ {ok, IdBtree} = couch_btree:open(IdState, Fd),
+ Views2 = lists:map(
+ fun(#view{btree=BtreeState}=View) ->
+ {ok, Btree} = couch_btree:open(BtreeState, Fd, [{less, fun less_json/2}]),
+ View#view{btree=Btree}
+ end,
+ Views),
+ Group#group{fd=Fd, id_btree=IdBtree, views=Views2}.
+
+% Given an initialized, in-memory group structure, return a disk ready version.
+mem_group_to_disk(#group{id_btree=IdBtree,views=Views}=Group) ->
+ Views2 = lists:map(
+ fun(#view{btree=Btree}=View) ->
+ State = couch_btree:get_state(Btree),
+ View#view{btree=State}
+ end,
+ Views),
+ Group#group{fd=nil, id_btree=couch_btree:get_state(IdBtree), views=Views2}.
+
+reset_header(GroupId, Fd, DefLanguage, NamedViews) ->
+ couch_file:truncate(Fd, 0),
+ {Views, _N} = lists:mapfoldl(
+ fun({Name, Definiton}, N) ->
+ {#view{name=Name, id_num=N, btree=nil, def=Definiton}, N+1}
+ end,
+ 0, NamedViews),
+ Group = #group{name=GroupId,
+ fd=Fd,
+ views=Views,
+ current_seq=0,
+ def_lang=DefLanguage,
+ id_btree=nil},
+ ok = couch_file:write_header(Fd, <<$r, $c, $k, 0>>, Group),
+ disk_group_to_mem(Fd, Group).
+
+
+
+less_json(A, B) ->
+ TypeA = type_sort(A),
+ TypeB = type_sort(B),
+ if
+ TypeA == TypeB ->
+ less_same_type(A,B);
+ true ->
+ TypeA < TypeB
+ end.
+
+type_sort(V) when is_atom(V) -> 0;
+type_sort(V) when is_integer(V) -> 1;
+type_sort(V) when is_float(V) -> 1;
+type_sort(V) when is_list(V) -> 2;
+type_sort({obj, _}) -> 4; % must come before tuple test below
+type_sort(V) when is_tuple(V) -> 3;
+type_sort(V) when is_binary(V) -> 5.
+
+atom_sort(nil) -> 0;
+atom_sort(null) -> 1;
+atom_sort(false) -> 2;
+atom_sort(true) -> 3.
+
+less_same_type(A,B) when is_atom(A) ->
+ atom_sort(A) < atom_sort(B);
+less_same_type(A,B) when is_list(A) ->
+ couch_util:collate(A, B) < 0;
+less_same_type({obj, AProps}, {obj, BProps}) ->
+ less_props(AProps, BProps);
+less_same_type(A, B) when is_tuple(A) ->
+ less_list(tuple_to_list(A),tuple_to_list(B));
+less_same_type(A, B) ->
+ A < B.
+
+ensure_list(V) when is_list(V) -> V;
+ensure_list(V) when is_atom(V) -> atom_to_list(V).
+
+less_props([], [_|_]) ->
+ true;
+less_props(_, []) ->
+ false;
+less_props([{AKey, AValue}|RestA], [{BKey, BValue}|RestB]) ->
+ case couch_util:collate(ensure_list(AKey), ensure_list(BKey)) of
+ -1 -> true;
+ 1 -> false;
+ 0 ->
+ case less_json(AValue, BValue) of
+ true -> true;
+ false ->
+ case less_json(BValue, AValue) of
+ true -> false;
+ false ->
+ less_props(RestA, RestB)
+ end
+ end
+ end.
+
+less_list([], [_|_]) ->
+ true;
+less_list(_, []) ->
+ false;
+less_list([A|RestA], [B|RestB]) ->
+ case less_json(A,B) of
+ true -> true;
+ false ->
+ case less_json(B,A) of
+ true -> false;
+ false ->
+ less_list(RestA, RestB)
+ end
+ end.
+
+process_doc(Db, DocInfo, {Docs, #group{name=GroupId}=Group, ViewKVs, DocIdViewIdKeys, _LastSeq}) ->
+ % This fun computes once for each document
+ #doc_info{id=DocId, update_seq=Seq, deleted=Deleted} = DocInfo,
+ case DocId of
+ GroupId ->
+ % uh oh. this is the design doc with our definitions. See if
+ % anything in the definition changed.
+ case couch_db:open_doc(Db, DocInfo) of
+ {ok, Doc} ->
+ case couch_doc:get_view_functions(Doc) of
+ none ->
+ throw(restart);
+ {DefLang, NewDefs} ->
+ case Group#group.def_lang == DefLang andalso same_view_def(Group#group.views, NewDefs) of
+ true ->
+ % nothing changed, keeping on computing
+ {ok, {Docs, Group, ViewKVs, DocIdViewIdKeys, Seq}};
+ false ->
+ throw(restart)
+ end
+ end;
+ {not_found, deleted} ->
+ throw(restart)
+ end;
+ ?DESIGN_DOC_PREFIX ++ _ -> % we skip design docs
+ {ok, {Docs, Group, ViewKVs, DocIdViewIdKeys, Seq}};
+ _ ->
+ {Docs2, DocIdViewIdKeys2} =
+ if Deleted ->
+ {Docs, [{DocId, []} | DocIdViewIdKeys]};
+ true ->
+ {ok, Doc} = couch_db:open_doc(Db, DocInfo, [conflicts, deleted_conflicts]),
+ {[Doc | Docs], DocIdViewIdKeys}
+ end,
+ case process_info(self(), memory) of
+ {memory, Mem} when Mem > ?FLUSH_MAX_MEM ->
+ {Group1, Results} = view_compute(Group, Docs2),
+ {ViewKVs3, DocIdViewIdKeys3} = view_insert_query_results(Docs2, Results, ViewKVs, DocIdViewIdKeys2),
+ {ok, Group2} = write_changes(Group1, ViewKVs3, DocIdViewIdKeys3, Seq),
+ garbage_collect(),
+ ViewEmptyKeyValues = [{View, []} || View <- Group2#group.views],
+ {ok, {[], Group2, ViewEmptyKeyValues, [], Seq}};
+ _Else ->
+ {ok, {Docs2, Group, ViewKVs, DocIdViewIdKeys2, Seq}}
+ end
+ end.
+
+view_insert_query_results([], [], ViewKVs, DocIdViewIdKeysAcc) ->
+ {ViewKVs, DocIdViewIdKeysAcc};
+view_insert_query_results([Doc|RestDocs], [QueryResults | RestResults], ViewKVs, DocIdViewIdKeysAcc) ->
+ {NewViewKVs, NewViewIdKeys} = view_insert_doc_query_results(Doc, QueryResults, ViewKVs, [], []),
+ NewDocIdViewIdKeys = [{Doc#doc.id, NewViewIdKeys} | DocIdViewIdKeysAcc],
+ view_insert_query_results(RestDocs, RestResults, NewViewKVs, NewDocIdViewIdKeys).
+
+
+view_insert_doc_query_results(_Doc, [], [], ViewKVsAcc, ViewIdKeysAcc) ->
+ {lists:reverse(ViewKVsAcc), lists:reverse(ViewIdKeysAcc)};
+view_insert_doc_query_results(#doc{id=DocId}=Doc, [ResultKVs|RestResults], [{View, KVs}|RestViewKVs], ViewKVsAcc, ViewIdKeysAcc) ->
+ NewKVs = [{{Key, DocId}, Value} || {Key, Value} <- ResultKVs],
+ NewViewIdKeys = [{View#view.id_num, Key} || {Key, _Value} <- ResultKVs],
+ NewViewKVsAcc = [{View, NewKVs ++ KVs} | ViewKVsAcc],
+ NewViewIdKeysAcc = NewViewIdKeys ++ ViewIdKeysAcc,
+ view_insert_doc_query_results(Doc, RestResults, RestViewKVs, NewViewKVsAcc, NewViewIdKeysAcc).
+
+view_compute(Group, []) ->
+ {Group, []};
+view_compute(#group{def_lang=DefLang, query_server=QueryServerIn}=Group, Docs) ->
+ {ok, QueryServer} =
+ case QueryServerIn of
+ nil -> % doc map not started
+ Definitions = [View#view.def || View <- Group#group.views],
+ couch_query_servers:start_doc_map(DefLang, Definitions);
+ _ ->
+ {ok, QueryServerIn}
+ end,
+ {ok, Results} = couch_query_servers:map_docs(QueryServer, Docs),
+ {Group#group{query_server=QueryServer}, Results}.
+
+
+dict_find(Key, DefaultValue, Dict) ->
+ case dict:find(Key, Dict) of
+ {ok, Value} ->
+ Value;
+ error ->
+ DefaultValue
+ end.
+
+write_changes(Group, ViewKeyValuesToAdd, DocIdViewIdKeys, NewSeq) ->
+ #group{id_btree=IdBtree} = Group,
+
+ AddDocIdViewIdKeys = [{DocId, ViewIdKeys} || {DocId, ViewIdKeys} <- DocIdViewIdKeys, ViewIdKeys /= []],
+ RemoveDocIds = [DocId || {DocId, ViewIdKeys} <- DocIdViewIdKeys, ViewIdKeys == []],
+ LookupDocIds = [DocId || {DocId, _ViewIdKeys} <- DocIdViewIdKeys],
+
+ {ok, LookupResults, IdBtree2}
+ = couch_btree:query_modify(IdBtree, LookupDocIds, AddDocIdViewIdKeys, RemoveDocIds),
+ KeysToRemoveByView = lists:foldl(
+ fun(LookupResult, KeysToRemoveByViewAcc) ->
+ case LookupResult of
+ {ok, {DocId, ViewIdKeys}} ->
+ lists:foldl(
+ fun({ViewId, Key}, KeysToRemoveByViewAcc2) ->
+ dict:append(ViewId, {Key, DocId}, KeysToRemoveByViewAcc2)
+ end,
+ KeysToRemoveByViewAcc, ViewIdKeys);
+ {not_found, _} ->
+ KeysToRemoveByViewAcc
+ end
+ end,
+ dict:new(), LookupResults),
+
+ Views2 = [
+ begin
+ KeysToRemove = dict_find(View#view.id_num, [], KeysToRemoveByView),
+ {ok, ViewBtree2} = couch_btree:add_remove(View#view.btree, AddKeyValues, KeysToRemove),
+ View#view{btree = ViewBtree2}
+ end
+ ||
+ {View, AddKeyValues} <- ViewKeyValuesToAdd
+ ],
+ Group2 = Group#group{views=Views2, current_seq=NewSeq, id_btree=IdBtree2},
+ {ok, Group2}.
diff --git a/src/couchdb/mod_couch.erl b/src/couchdb/mod_couch.erl
new file mode 100644
index 00000000..78c0853a
--- /dev/null
+++ b/src/couchdb/mod_couch.erl
@@ -0,0 +1,891 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(mod_couch).
+
+-include("couch_db.hrl").
+
+-export([do/1, load/2, url_decode/1]).
+
+-include_lib("../couch_inets/httpd.hrl").
+
+-record(uri_parts,
+ {db = "",
+ doc = "",
+ attachment = "",
+ view = "",
+ querystr = ""}).
+
+-record(doc_query_args,
+ {
+ options = [],
+ rev = "",
+ open_revs = ""
+ }).
+
+%% do. This is the main entry point into Apache CouchDB from the HTTP server
+
+do(Mod) ->
+ #mod{request_uri=Uri,request_line=Request, parsed_header=Header,entity_body=Body} = Mod,
+ PrevTrapExit = process_flag(trap_exit, true),
+ Resp =
+ case Uri of
+ "/_utils/" ++ RestURI ->
+ % if the URI is the utils directory, then this
+ % tells mod_get (a std HTTP module) where to serve the file from
+ DocumentRoot = httpd_util:lookup(Mod#mod.config_db, document_root, ""),
+ {Path, AfterPath} = httpd_util:split_path(DocumentRoot ++ "/" ++ RestURI),
+
+ case RestURI of
+ "" ->
+ Paths = httpd_util:split_path(DocumentRoot ++ "/index.html"),
+ {proceed, [{real_name, Paths} | Mod#mod.data]};
+ _ ->
+ case filelib:is_file(Path) of
+ true ->
+ {proceed, [{real_name, {Path, AfterPath}} | Mod#mod.data]};
+ false ->
+ case filelib:is_dir(Path) of
+ true ->
+ % this ends up causing a "Internal Server Error", need to fix.
+ {proceed, [{response,{403,"Forbidden"}}]};
+ false ->
+ {proceed, [{response,{404,"Not found"}}]}
+ end
+ end
+ end;
+ "/favicon.ico" ->
+ DocumentRoot = httpd_util:lookup(Mod#mod.config_db, document_root, ""),
+ RealName = DocumentRoot ++ "/" ++ Uri,
+ {Path, AfterPath} = httpd_util:split_path(RealName),
+ {proceed, [{real_name, {Path, AfterPath}} | Mod#mod.data]};
+ _ ->
+ couch_log:info("HTTP Request: ~s", [Request]),
+ couch_log:debug("Headers: ~p", [Header]),
+ couch_log:debug("Body: ~P", [Body, 100]),
+ case (catch parse_uri(Uri)) of
+ {ok, Parts} ->
+ {ok, ResponseCode} =
+ case (catch do(Mod, Parts)) of
+ {ok, ResponseCode0} ->
+ {ok, ResponseCode0};
+ Error ->
+ send_error(Mod, Error)
+ end;
+ Error ->
+ {ok, ResponseCode} = send_error(Mod, Error)
+ end,
+ couch_log:info("HTTP Response Code:~p~n", [ResponseCode]),
+ {proceed, [{response, {already_sent, ResponseCode, 0}} | Mod#mod.data]}
+ end,
+ process_flag(trap_exit, PrevTrapExit),
+ Resp.
+
+
+parse_uri(RequestUri) ->
+ % seperate out the path and query portions and
+ % strip out leading slash and question mark.
+ case regexp:split(RequestUri, "\\?") of
+ {ok, [[$/|UriPath], QueryStr]} -> ok;
+ {ok, [[$/|UriPath]]} -> QueryStr = ""
+ end,
+ % lets try to parse out the UriPath.
+ {ok, UrlParts} = regexp:split(UriPath, "/"),
+
+ {DbName, Id, Attachment, View} =
+ case UrlParts of
+ [Db] ->
+ {Db, "", "", ""};
+ [Db, "_design", Doc] ->
+ {Db, "_design/" ++ Doc, "", ""};
+ [Db, "_design", Doc, Attachment0] ->
+ {Db, "_design/" ++ Doc, Attachment0, ""};
+ [Db, "_view", Doc, ViewName] ->
+ {Db, "_design/" ++ Doc, "", ViewName};
+ [Db, "_view%2f" ++ Doc, ViewName] ->
+ {Db, "_design/" ++ Doc, "", ViewName};
+ [Db, Doc] ->
+ {Db, Doc, "", ""};
+ [Db, Doc, Attachment0] ->
+ {Db, Doc, Attachment0, ""};
+ _ ->
+ throw({invalid_uri, lists:flatten(io_lib:format("Uri has too many parts: ~p", [UrlParts]))})
+ end,
+ {ok, #uri_parts{db=url_decode(DbName),
+ doc=url_decode(Id),
+ attachment=url_decode(Attachment),
+ view=url_decode(View),
+ querystr=url_decode(QueryStr)}}.
+
+resp_json_header(Mod) ->
+ resp_json_header(Mod, []).
+
+% return json doc header values list
+resp_json_header(Mod, Options) ->
+ Types = string:tokens(proplists:get_value("accept", Mod#mod.parsed_header, ""), ", "),
+ case lists:member("application/json", Types) of
+ true ->
+ resp_header(Mod, Options) ++ [{"content-type","application/json"}];
+ false ->
+ resp_header(Mod, Options) ++ [{"content-type","text/plain;charset=utf-8"}]
+ end.
+
+% return doc header values list
+resp_header(#mod{http_version=Version}, Options) ->
+ [{"cache-control", "no-cache"},
+ {"pragma", "no-cache"},
+ {"expires", httpd_util:rfc1123_date()}] ++
+ case lists:member(no_body, Options) of
+ true -> [];
+ false ->
+ case Version == "HTTP/1.1" of
+ true ->
+ [{"transfer-encoding", "chunked"}];
+ false ->
+ [{"connection", "close"}]
+ end
+ end.
+
+
+url_decode([$%, Hi, Lo | Tail]) ->
+ Hex = erlang:list_to_integer([Hi, Lo], 16),
+ xmerl_ucs:to_utf8([Hex]) ++ url_decode(Tail);
+url_decode([H|T]) ->
+ [H |url_decode(T)];
+url_decode([]) ->
+ [].
+
+
+send_header(Mod, RespCode, Headers) ->
+ couch_log:debug("HTTP Response Headers (code ~w): ~p", [RespCode, Headers]),
+ httpd_response:send_header(Mod, RespCode, Headers).
+
+send_chunk(Mod, Data) ->
+ httpd_response:send_chunk(Mod, Data, false).
+
+send_final_chunk(Mod) ->
+ httpd_response:send_final_chunk(Mod, false).
+
+show_couch_welcome(Mod) ->
+ send_header(Mod, 200, resp_json_header(Mod)),
+ send_chunk(Mod, "{\"couchdb\": \"Welcome\", "),
+ send_chunk(Mod, "\"version\": \"" ++ couch_server:get_version()),
+ send_chunk(Mod, "\"}\n"),
+ send_final_chunk(Mod),
+ {ok, 200}.
+
+
+do(#mod{method="GET"}=Mod, #uri_parts{db=""}) ->
+ show_couch_welcome(Mod);
+do(#mod{method="GET"}=Mod, #uri_parts{db="_all_dbs", doc=""}=Parts) ->
+ send_all_dbs(Mod, Parts);
+do(#mod{method="POST"}=Mod, #uri_parts{db="_replicate", doc=""}) ->
+ handle_replication_request(Mod);
+do(#mod{method="POST"}=Mod, #uri_parts{db="_restart", doc=""}) ->
+ couch_server:remote_restart(),
+ send_ok(Mod, 201);
+do(#mod{method="POST"}=Mod, #uri_parts{doc="_missing_revs"}=Parts) ->
+ handle_missing_revs_request(Mod, Parts);
+do(#mod{method="PUT"}=Mod, #uri_parts{doc=""}=Parts) ->
+ handle_db_create(Mod, Parts);
+do(#mod{method="DELETE"}=Mod, #uri_parts{doc=""}=Parts) ->
+ handle_db_delete(Mod, Parts);
+do(#mod{method="POST"}=Mod, #uri_parts{doc="_bulk_docs"}=Parts) ->
+ handle_bulk_doc_update(Mod, Parts);
+do(#mod{method="POST"}=Mod, #uri_parts{doc=""}=Parts) ->
+ handle_doc_post(Mod, Parts);
+do(#mod{method="PUT"}=Mod, Parts) ->
+ handle_doc_put(Mod, Parts);
+do(#mod{method="DELETE"}=Mod, Parts) ->
+ handle_doc_delete(Mod, Parts);
+do(#mod{method="POST"}=Mod, #uri_parts{doc="_temp_view"}=Parts) ->
+ send_temp_view(Mod, Parts);
+do(#mod{method="GET"}=Mod, #uri_parts{doc="_all_docs"}=Parts) ->
+ send_all_docs(Mod, Parts);
+do(#mod{method="GET"}=Mod, #uri_parts{doc="_all_docs_by_seq"}=Parts) ->
+ send_all_docs_by_seq(Mod, Parts);
+do(#mod{method="GET"}=Mod, #uri_parts{doc=""}=Parts) ->
+ send_database_info(Mod, Parts);
+do(#mod{method=Method}=Mod, #uri_parts{attachment="",view=""}=Parts)
+ when Method == "GET" orelse Method == "HEAD" ->
+ #doc_query_args{open_revs=Revs} = doc_parse_query(Parts#uri_parts.querystr),
+ case Revs of
+ [] ->
+ send_doc(Mod, Parts);
+ _ ->
+ send_doc_revs(Mod, Parts)
+ end;
+do(#mod{method=Method}=Mod, #uri_parts{attachment=Att}=Parts)
+ when Att /= "", Method == "GET" orelse Method == "HEAD" ->
+ send_attachment(Mod, Parts);
+do(#mod{method="GET"}=Mod, #uri_parts{view=View}=Parts) when View /= "" ->
+ send_view(Mod, Parts).
+
+handle_db_create(Mod, #uri_parts{db=DbName}) ->
+ case couch_server:create(DbName, []) of
+ {ok, _Db} ->
+ send_ok(Mod, 201);
+ {error, database_already_exists} ->
+ Msg = io_lib:format("Database ~p already exists.", [DbName]),
+ throw({database_already_exists, Msg});
+ Error ->
+ Msg = io_lib:format("Error creating database ~p: ~p", [DbName, Error]),
+ throw({unknown_error, Msg})
+ end.
+
+handle_db_delete(Mod, #uri_parts{db=DbName}) ->
+ % delete with no doc specified, therefore database delete
+ case couch_server:delete(DbName) of
+ ok ->
+ send_ok(Mod, 202);
+ Error ->
+ throw(Error)
+ end.
+
+handle_bulk_doc_update(#mod{entity_body=RawBody}=Mod, Parts) ->
+ Options = [], % put options here.
+ Db = open_db(Parts),
+ {obj, JsonProps} = cjson:decode(RawBody),
+ DocsArray = proplists:get_value("docs", JsonProps),
+ % convert all the doc elements to native docs
+ case proplists:get_value("new_edits", JsonProps, true) of
+ true ->
+ Docs = lists:map(
+ fun({obj, ObjProps} = JsonObj) ->
+ Doc = couch_doc:from_json_obj(JsonObj),
+
+ Id =
+ case Doc#doc.id of
+ "" -> couch_util:new_uuid();
+ Id0 -> Id0
+ end,
+ Revs =
+ case proplists:get_value("_rev", ObjProps) of
+ undefined -> [];
+ Rev -> [Rev]
+ end,
+ Doc#doc{id=Id,revs=Revs}
+ end,
+ tuple_to_list(DocsArray)),
+
+ {ok, ResultRevs} = couch_db:update_docs(Db, Docs, Options),
+
+ % output the results
+ DocResults = lists:zipwith(
+ fun(Doc, NewRev) ->
+ {obj, [{"id", Doc#doc.id}, {"rev", NewRev}]}
+ end,
+ Docs, ResultRevs),
+ send_ok(Mod, 201, [{new_revs, list_to_tuple(DocResults)}]);
+
+ false ->
+ Docs = [couch_doc:from_json_obj(JsonObj) || JsonObj <- tuple_to_list(DocsArray)],
+ ok = couch_db:save_docs(Db, Docs, Options),
+ send_ok(Mod, 201)
+ end.
+
+
+
+
+doc_parse_query(QueryStr) ->
+ QueryList = httpd:parse_query(QueryStr),
+ lists:foldl(fun({Key,Value}, Args) ->
+ case {Key, Value} of
+ {"attachments", "true"} ->
+ Options = [attachments | Args#doc_query_args.options],
+ Args#doc_query_args{options=Options};
+ {"meta", "true"} ->
+ Options = [revs_info, conflicts, deleted_conflicts | Args#doc_query_args.options],
+ Args#doc_query_args{options=Options};
+ {"revs", "true"} ->
+ Options = [revs | Args#doc_query_args.options],
+ Args#doc_query_args{options=Options};
+ {"revs_info", "true"} ->
+ Options = [revs_info | Args#doc_query_args.options],
+ Args#doc_query_args{options=Options};
+ {"conflicts", "true"} ->
+ Options = [conflicts | Args#doc_query_args.options],
+ Args#doc_query_args{options=Options};
+ {"deleted_conflicts", "true"} ->
+ Options = [deleted_conflicts | Args#doc_query_args.options],
+ Args#doc_query_args{options=Options};
+ {"rev", Rev} ->
+ Args#doc_query_args{rev=Rev};
+ {"open_revs", "all"} ->
+ Args#doc_query_args{open_revs=all};
+ {"open_revs", RevsJsonStr} ->
+ JsonArray = cjson:decode(RevsJsonStr),
+ Args#doc_query_args{open_revs=tuple_to_list(JsonArray)};
+ _Else -> % unknown key value pair, ignore.
+ Args
+ end
+ end,
+ #doc_query_args{}, QueryList).
+
+
+handle_doc_post(#mod{entity_body=RawBody}=Mod, Parts) ->
+ Db = open_db(Parts),
+ Json = cjson:decode(RawBody),
+ Doc = couch_doc:from_json_obj(Json),
+ Id = couch_util:new_uuid(),
+ {ok, NewRevId} = couch_db:update_doc(Db, Doc#doc{id=Id, revs=[]}, []),
+ send_ok(Mod, 201, [{"id", Id}, {"rev", NewRevId}], [{"etag", NewRevId}]).
+
+handle_doc_put(#mod{parsed_header=Headers}=Mod,
+ #uri_parts{doc=Id, querystr=QueryStr}=Parts) ->
+ #doc_query_args{options=SaveOptions} = doc_parse_query(QueryStr),
+ Db = open_db(Parts),
+ {obj, ObjProps} = Json = cjson:decode(Mod#mod.entity_body),
+ Doc = couch_doc:from_json_obj(Json),
+ Etag = proplists:get_value("if-match", Headers, ""),
+ DocRev = proplists:get_value("_rev", ObjProps, ""),
+
+ if DocRev /= "" andalso Etag /= "" andalso DocRev /= Etag ->
+ throw({invalid_request, "Document rev and etag have different values"});
+ true -> ok
+ end,
+ Revs =
+ if DocRev /= "" -> [DocRev];
+ Etag /= "" -> [Etag];
+ true -> []
+ end,
+
+ {ok, NewRevId} = couch_db:update_doc(Db, Doc#doc{id=Id, revs=Revs}, SaveOptions),
+ send_ok(Mod, 201, [{"id", Id}, {"rev", NewRevId}],[{"etag", NewRevId}]).
+
+handle_doc_delete(#mod{parsed_header=Headers}=Mod,
+ #uri_parts{doc=Id, querystr=QueryStr}=Parts) ->
+ Db = open_db(Parts),
+ #doc_query_args{rev=QueryRev} = doc_parse_query(QueryStr),
+ Etag = proplists:get_value("if-match", Headers, ""),
+ RevToDelete =
+ case {QueryRev, Etag} of
+ {"", ""} ->
+ throw({missing_rev, "Document rev/etag must be specified to delete"});
+ {_, ""} ->
+ QueryRev;
+ {"", _} ->
+ Etag;
+ _ when QueryRev == Etag ->
+ Etag;
+ _ ->
+ throw({invalid_request, "Document rev and etag have different values"})
+ end,
+ {ok, NewRev} = couch_db:delete_doc(Db, Id, [RevToDelete]),
+ send_ok(Mod, 202, [{"id", Id}, {"rev", NewRev}]).
+
+
+-record(query_args,
+ {start_key = nil,
+ end_key = <<>>,
+ count = 10000000000, % a huge huge default number. Picked so we don't have
+ % to do different logic for when there is no count limit
+ update = true,
+ direction = fwd,
+ start_docid = nil,
+ end_docid = <<>>,
+ skip = 0
+ }).
+
+reverse_key_default(nil) -> <<>>;
+reverse_key_default(<<>>) -> nil;
+reverse_key_default(Key) -> Key.
+
+view_parse_query(QueryStr) ->
+ QueryList = httpd:parse_query(QueryStr),
+ lists:foldl(fun({Key,Value}, Args) ->
+ case {Key, Value} of
+ {"", _} ->
+ Args;
+ {"key", Value} ->
+ JsonKey = cjson:decode(Value),
+ Args#query_args{start_key=JsonKey,end_key=JsonKey};
+ {"startkey_docid", DocId} ->
+ Args#query_args{start_docid=DocId};
+ {"startkey", Value} ->
+ Args#query_args{start_key=cjson:decode(Value)};
+ {"endkey", Value} ->
+ Args#query_args{end_key=cjson:decode(Value)};
+ {"count", Value} ->
+ case (catch list_to_integer(Value)) of
+ Count when is_integer(Count) ->
+ if Count < 0 ->
+ Args#query_args {
+ direction =
+ if Args#query_args.direction == rev -> fwd;
+ true -> rev
+ end,
+ count=Count,
+ start_key = reverse_key_default(Args#query_args.start_key),
+ start_docid = reverse_key_default(Args#query_args.start_docid),
+ end_key = reverse_key_default(Args#query_args.end_key),
+ end_docid = reverse_key_default(Args#query_args.end_docid)};
+ true ->
+ Args#query_args{count=Count}
+ end;
+ _Error ->
+ Msg = io_lib:format("Bad URL query value, number expected: count=~s", [Value]),
+ throw({query_parse_error, Msg})
+ end;
+ {"update", "false"} ->
+ Args#query_args{update=false};
+ {"descending", "true"} ->
+ case Args#query_args.direction of
+ fwd ->
+ Args#query_args {
+ direction = rev,
+ start_key = reverse_key_default(Args#query_args.start_key),
+ start_docid = reverse_key_default(Args#query_args.start_docid),
+ end_key = reverse_key_default(Args#query_args.end_key),
+ end_docid = reverse_key_default(Args#query_args.end_docid)};
+ _ ->
+ Args %already reversed
+ end;
+ {"skip", Value} ->
+ case (catch list_to_integer(Value)) of
+ Count when is_integer(Count) ->
+ Args#query_args{skip=Count};
+ _Error ->
+ Msg = lists:flatten(io_lib:format(
+ "Bad URL query value, number expected: skip=~s", [Value])),
+ throw({query_parse_error, Msg})
+ end;
+ _ -> % unknown key
+ Msg = lists:flatten(io_lib:format(
+ "Bad URL query key:~s", [Key])),
+ throw({query_parse_error, Msg})
+ end
+ end,
+ #query_args{}, QueryList).
+
+
+% returns db, otherwise throws exception. Note: no {ok,_}.
+open_db(#uri_parts{db=DbName}) ->
+ open_db(DbName);
+open_db(DbName) when is_list(DbName)->
+ case couch_server:open(DbName) of
+ {ok, Db} ->
+ Db;
+ Error ->
+ throw(Error)
+ end.
+
+handle_missing_revs_request(#mod{entity_body=RawJson}=Mod, Parts) ->
+ Db = open_db(Parts),
+ {obj, JsonDocIdRevs} = cjson:decode(RawJson),
+ DocIdRevs = [{Id, tuple_to_list(Revs)} || {Id, Revs} <- JsonDocIdRevs],
+ {ok, Results} = couch_db:get_missing_revs(Db, DocIdRevs),
+ JsonResults = [{Id, list_to_tuple(Revs)} || {Id, Revs} <- Results],
+ send_json(Mod, 200, {obj, [{missing_revs, {obj, JsonResults}}]}).
+
+handle_replication_request(#mod{entity_body=RawJson}=Mod) ->
+ {obj, Props} = cjson:decode(RawJson),
+ Src = proplists:get_value("source", Props),
+ Tgt = proplists:get_value("target", Props),
+ {obj, Options} = proplists:get_value("options", Props, {obj, []}),
+ {ok, {obj, JsonResults}} = couch_rep:replicate(Src, Tgt, Options),
+ send_ok(Mod, 200, JsonResults).
+
+
+
+send_database_info(Mod, #uri_parts{db=DbName}=Parts) ->
+ Db = open_db(Parts),
+ {ok, InfoList} = couch_db:get_db_info(Db),
+ ok = send_header(Mod, 200, resp_json_header(Mod)),
+ DocCount = proplists:get_value(doc_count, InfoList),
+ LastUpdateSequence = proplists:get_value(last_update_seq, InfoList),
+ ok = send_chunk(Mod, "{\"db_name\": \"" ++ DbName ++
+ "\", \"doc_count\":" ++ integer_to_list(DocCount) ++
+ ", \"update_seq\":" ++ integer_to_list(LastUpdateSequence)++"}"),
+ ok = send_final_chunk(Mod),
+ {ok, 200}.
+
+send_doc(#mod{parsed_header=Headers}=Mod,
+ #uri_parts{doc=DocId,querystr=QueryStr}=Parts) ->
+ Db = open_db(Parts),
+ #doc_query_args{rev=Rev, options=Options} = doc_parse_query(QueryStr),
+ case Rev of
+ "" ->
+ % open most recent rev
+ case couch_db:open_doc(Db, DocId, Options) of
+ {ok, #doc{revs=[DocRev|_]}=Doc} ->
+ Etag = proplists:get_value("if-none-match", Headers),
+ if Options == [] andalso Etag == DocRev ->
+ ok = send_header(Mod, 304,
+ resp_header(Mod, [no_body]) ++ [{"etag", DocRev}]),
+ {ok, 304};
+ true ->
+ send_json(Mod, 200, couch_doc:to_json_obj(Doc, Options),
+ if Options == [] -> [{"etag", DocRev}]; true -> [] end)
+ end;
+ Error ->
+ throw(Error)
+ end;
+ _ ->
+ % open a specific rev (deletions come back as stubs)
+ case couch_db:open_doc_revs(Db, DocId, [Rev], Options) of
+ {ok, [{ok, Doc}]} ->
+ send_json(Mod, 200, couch_doc:to_json_obj(Doc, Options), [{"etag", Rev}]);
+ {ok, [Else]} ->
+ throw(Else)
+ end
+ end.
+
+send_doc_revs(Mod, #uri_parts{doc=DocId,querystr=QueryStr}=Parts) ->
+ Db = open_db(Parts),
+ #doc_query_args{options=Options, open_revs=Revs} = doc_parse_query(QueryStr),
+ {ok, Results} = couch_db:open_doc_revs(Db, DocId, Revs, Options),
+ ok = send_header(Mod, 200, resp_json_header(Mod)),
+ ok = send_chunk(Mod, "["),
+ % We loop through the docs. The first time through the separator
+ % is whitespace, then a comma on subsequent iterations.
+ lists:foldl(
+ fun(Result, AccSeparator) ->
+ case Result of
+ {ok, Doc} ->
+ JsonDoc= couch_doc:to_json_obj(Doc, Options),
+ ok = send_chunk(Mod, AccSeparator ++ lists:flatten(cjson:encode({obj, [{ok, JsonDoc}]})));
+ {{not_found, missing}, RevId} ->
+ Json = {obj, [{"missing", RevId}]},
+ ok = send_chunk(Mod, AccSeparator ++ lists:flatten(cjson:encode(Json)))
+ end,
+ "," % AccSeparator now has a comma
+ end,
+ "", Results),
+ ok = send_chunk(Mod, "]"),
+ ok = send_final_chunk(Mod),
+ {ok, 200}.
+
+send_attachment(#mod{method=Method} = Mod,
+ #uri_parts{doc=DocId,attachment=Attachment}=Parts) ->
+ Db = open_db(Parts),
+ case couch_db:open_doc(Db, DocId, []) of
+ {ok, #doc{attachments=Attachments}} ->
+ case proplists:get_value(Attachment, Attachments) of
+ undefined ->
+ throw({not_found, missing});
+ {Type, Bin} ->
+ ok = send_header(Mod, 200, resp_header(Mod, Type) ++
+ [{"content-type", Type},
+ {"content-length", integer_to_list(couch_doc:bin_size(Bin))}]),
+ case Method of
+ "GET" ->
+ couch_doc:bin_foldl(Bin,
+ fun(BinSegment, []) ->
+ ok = send_chunk(Mod, BinSegment),
+ {ok, []}
+ end,
+ []);
+ "HEAD" ->
+ ok
+ end,
+ ok = send_final_chunk(Mod),
+ {ok, 200}
+ end;
+ Error ->
+ throw(Error)
+ end.
+
+
+send_json(Mod, Code, JsonData) ->
+ send_json(Mod, Code, JsonData, []).
+
+send_json(#mod{method=Method}=Mod, Code, JsonData, Headers) ->
+ case Method of
+ "HEAD" ->
+ ok = send_header(Mod, Code, resp_json_header(Mod, [no_body]) ++ Headers);
+ _ ->
+ ok = send_header(Mod, Code, resp_json_header(Mod) ++ Headers),
+ ok = send_chunk(Mod, lists:flatten([cjson:encode(JsonData) | "\n"])),
+ ok = send_final_chunk(Mod)
+ end,
+ {ok, Code}.
+
+
+send_ok(Mod, Code) ->
+ send_ok(Mod, Code, []).
+
+send_ok(Mod, Code, AdditionalProps) ->
+ send_ok(Mod, Code, AdditionalProps, []).
+
+send_ok(Mod, Code, AdditionalProps, AdditionalHeaders) ->
+ send_json(Mod, Code, {obj, [{ok, true}|AdditionalProps]}, AdditionalHeaders).
+
+
+make_view_fold_fun(Mod, QueryArgs) ->
+ #query_args{
+ end_key=EndKey,
+ end_docid=EndDocId,
+ direction=Dir,
+ count=Count
+ } = QueryArgs,
+
+ PassedEndFun =
+ case Dir of
+ fwd ->
+ fun(ViewKey, ViewId) ->
+ couch_view:less_json({EndKey,EndDocId}, {ViewKey,ViewId})
+ end;
+ rev->
+ fun(ViewKey, ViewId) ->
+ couch_view:less_json({ViewKey, ViewId}, {EndKey,EndDocId})
+ end
+ end,
+
+ NegCountFun =
+ fun(Id, Key, Value, Offset, TotalViewCount, {AccCount, AccSkip, HeaderSent, AccRevRows}) ->
+ PassedEnd = PassedEndFun(Key, Id),
+ case {PassedEnd, AccCount, AccSkip, HeaderSent} of
+ {true,_,_,_} ->
+ % The stop key has been passed, stop looping.
+ {stop, {AccCount, AccSkip, HeaderSent, AccRevRows}};
+ {_,0,_,_} ->
+ {stop, {0, 0, HeaderSent, AccRevRows}}; % we've done "count" rows, stop foldling
+ {_,_,AccSkip,_} when AccSkip > 0 ->
+ {ok, {AccCount, AccSkip - 1, HeaderSent, AccRevRows}};
+ {_,AccCount,_,header_sent} ->
+ JsonObj = {obj, [{"key",Key},{"id",Id},{"value",Value}]},
+ {ok, {AccCount + 1, 0, header_sent, [cjson:encode(JsonObj), "," | AccRevRows]}};
+ {_,_,_,header_not_sent} ->
+ ok = send_header(Mod, 200, resp_json_header(Mod)),
+ Offset2= TotalViewCount - Offset -
+ lists:min([TotalViewCount - Offset, - AccCount]),
+ JsonBegin = io_lib:format("{\"total_rows\":~w,\"offset\":~w,\"rows\":[",
+ [TotalViewCount, Offset2]),
+ JsonObj = {obj, [{"key",Key},{"id",Id},{"value",Value}]},
+ ok = send_chunk(Mod, lists:flatten(JsonBegin)),
+ {ok, {AccCount + 1, 0, header_sent, [cjson:encode(JsonObj) | AccRevRows]}}
+ end
+ end,
+
+ PosCountFun =
+ fun(Id, Key, Value, Offset, TotalViewCount, {AccCount, AccSkip, HeaderSent, AccRevRows}) ->
+ PassedEnd = PassedEndFun(Key, Id),
+ case {PassedEnd, AccCount, AccSkip, HeaderSent} of
+ {true,_,_,_} ->
+ % The stop key has been passed, stop looping.
+ {stop, {AccCount, AccSkip, HeaderSent, AccRevRows}};
+ {_,0,_,_} ->
+ {stop, {0, 0, HeaderSent, AccRevRows}}; % we've done "count" rows, stop foldling
+ {_,_,AccSkip,_} when AccSkip > 0 ->
+ {ok, {AccCount, AccSkip - 1, HeaderSent, AccRevRows}};
+ {_,AccCount,_,header_sent} when (AccCount > 0) ->
+ JsonObj = {obj, [{"key",Key},{"id",Id},{"value",Value}]},
+ ok = send_chunk(Mod, "," ++ lists:flatten(cjson:encode(JsonObj))),
+ {ok, {AccCount - 1, 0, header_sent, AccRevRows}};
+ {_,_,_,header_not_sent} ->
+ ok = send_header(Mod, 200, resp_json_header(Mod)),
+ JsonBegin = io_lib:format("{\"total_rows\":~w,\"offset\":~w,\"rows\":[",
+ [TotalViewCount, Offset]),
+ JsonObj = {obj, [{"key",Key},{"id",Id},{"value",Value}]},
+ ok = send_chunk(Mod, lists:flatten(JsonBegin ++ cjson:encode(JsonObj))),
+ {ok, {AccCount - 1, 0, header_sent, AccRevRows}}
+ end
+ end,
+ case Count > 0 of
+ true -> PosCountFun;
+ false -> NegCountFun
+ end.
+
+finish_view_fold(Mod, FoldResult) ->
+ case FoldResult of
+ {ok, TotalRows, {_, _, header_not_sent, _}} ->
+ % nothing found in the view, nothing has been returned
+ % send empty view
+ ok = send_header(Mod, 200, resp_json_header(Mod)),
+ JsonEmptyView = lists:flatten(
+ io_lib:format("{\"total_rows\":~w,\"rows\":[]}\n",
+ [TotalRows])),
+ ok = send_chunk(Mod, JsonEmptyView),
+ ok = send_final_chunk(Mod),
+ {ok, 200};
+ {ok, _TotalRows, {_, _, header_sent, AccRevRows}} ->
+ % end the view
+ ok = send_chunk(Mod, lists:flatten(AccRevRows) ++ "]}\n"),
+ ok = send_final_chunk(Mod),
+ {ok, 200};
+ Error ->
+ throw(Error)
+ end.
+
+
+send_temp_view(#mod{entity_body=Body,parsed_header=Headers}=Mod,
+ #uri_parts{db=DbName, querystr=QueryStr}) ->
+ #query_args{
+ start_key=StartKey,
+ count=Count,
+ skip=SkipCount,
+ direction=Dir,
+ start_docid=StartDocId} = QueryArgs = view_parse_query(QueryStr),
+ Type0 = proplists:get_value("content-type", Headers, "text/javascript"),
+ % remove the charset ("...;charset=foo") if its there
+ {ok, [Type|_]} = regexp:split(Type0, ";"),
+ View = {temp, DbName, Type, Body},
+ Start = {StartKey, StartDocId},
+ FoldlFun = make_view_fold_fun(Mod, QueryArgs),
+ FoldAccInit = {Count, SkipCount, header_not_sent, []},
+ FoldResult = couch_view:fold(View, Start, Dir, FoldlFun, FoldAccInit),
+ finish_view_fold(Mod, FoldResult).
+
+
+send_view(Mod, #uri_parts{db=DbName, doc=DesignDocId, view=ViewId, querystr=QueryStr}) ->
+ #query_args{
+ start_key=StartKey,
+ count=Count,
+ skip=SkipCount,
+ direction=Dir,
+ start_docid=StartDocId} = QueryArgs = view_parse_query(QueryStr),
+ View = {DbName, DesignDocId, ViewId},
+ Start = {StartKey, StartDocId},
+ FoldlFun = make_view_fold_fun(Mod, QueryArgs),
+ FoldAccInit = {Count, SkipCount, header_not_sent, []},
+ Result = couch_view:fold(View, Start, Dir, FoldlFun, FoldAccInit),
+ finish_view_fold(Mod, Result).
+
+
+send_all_docs(Mod, #uri_parts{querystr=QueryStr}=Parts) ->
+ Db = open_db(Parts),
+ #query_args{
+ start_key=StartKey,
+ start_docid=StartDocId,
+ count=Count,
+ skip=SkipCount,
+ direction=Dir} = QueryArgs = view_parse_query(QueryStr),
+ {ok, Info} = couch_db:get_db_info(Db),
+ TotalRowCount = proplists:get_value(doc_count, Info),
+
+ StartId =
+ if is_list(StartKey) -> StartKey;
+ true -> StartDocId
+ end,
+
+ FoldlFun = make_view_fold_fun(Mod, QueryArgs),
+ AdapterFun =
+ fun(#full_doc_info{id=Id}=FullDocInfo, Offset, Acc) ->
+ case couch_doc:to_doc_info(FullDocInfo) of
+ #doc_info{deleted=false, rev=Rev} ->
+ FoldlFun(Id, Id, {obj, [{rev, Rev}]}, Offset, TotalRowCount, Acc);
+ #doc_info{deleted=true} ->
+ {ok, Acc}
+ end
+ end,
+ {ok, FoldResult} = couch_db:enum_docs(Db, StartId, Dir, AdapterFun,
+ {Count, SkipCount, header_not_sent, []}),
+ finish_view_fold(Mod, {ok, TotalRowCount, FoldResult}).
+
+send_all_docs_by_seq(Mod, #uri_parts{querystr=QueryStr}=Parts) ->
+ Db = open_db(Parts),
+ QueryArgs = view_parse_query(QueryStr),
+ #query_args{
+ start_key=StartKey,
+ count=Count,
+ skip=SkipCount,
+ direction=Dir} = QueryArgs,
+
+ {ok, Info} = couch_db:get_db_info(Db),
+ TotalRowCount = proplists:get_value(doc_count, Info),
+
+ FoldlFun = make_view_fold_fun(Mod, QueryArgs),
+
+ StartKey2 =
+ case StartKey of
+ nil -> 0;
+ <<>> -> 100000000000;
+ StartKey when is_integer(StartKey) -> StartKey
+ end,
+ {ok, FoldResult} =
+ couch_db:enum_docs_since(Db, StartKey2, Dir,
+ fun(DocInfo, Offset, Acc) ->
+ #doc_info{
+ id=Id,
+ rev=Rev,
+ update_seq=UpdateSeq,
+ deleted=Deleted,
+ conflict_revs=ConflictRevs,
+ deleted_conflict_revs=DelConflictRevs} = DocInfo,
+ Json =
+ {obj,
+ [{"rev", Rev}] ++
+ case ConflictRevs of
+ [] -> [];
+ _ -> [{"conflicts", list_to_tuple(ConflictRevs)}]
+ end ++
+ case DelConflictRevs of
+ [] -> [];
+ _ -> [{"deleted_conflicts", list_to_tuple(DelConflictRevs)}]
+ end ++
+ case Deleted of
+ true -> [{"deleted", true}];
+ false -> []
+ end
+ },
+ FoldlFun(Id, UpdateSeq, Json, Offset, TotalRowCount, Acc)
+ end, {Count, SkipCount, header_not_sent, []}),
+ finish_view_fold(Mod, {ok, TotalRowCount, FoldResult}).
+
+
+
+send_all_dbs(Mod, _Parts)->
+ {ok, DbNames} = couch_server:all_databases(),
+ ok = send_header(Mod, 200, resp_json_header(Mod)),
+ ok = send_chunk(Mod, lists:flatten(cjson:encode(list_to_tuple(DbNames)))),
+ ok = send_final_chunk(Mod),
+ {ok, 200}.
+
+send_error(Mod, Error) ->
+ {Json, Code} = error_to_json(Error),
+ couch_log:info("HTTP Error (code ~w): ~p", [Code, Json]),
+ send_json(Mod, Code, Json).
+
+
+
+% convert an error response into a json object and http error code.
+error_to_json(Error) ->
+ {HttpCode, Atom, Reason} = error_to_json0(Error),
+ Reason1 =
+ case (catch io_lib:format("~s", [Reason])) of
+ Reason0 when is_list(Reason0) ->
+ lists:flatten(Reason0);
+ _ ->
+ lists:flatten(io_lib:format("~p", [Reason])) % else term to text
+ end,
+ Json =
+ {obj,
+ [{error, atom_to_list(Atom)},
+ {reason, Reason1}]},
+ {Json, HttpCode}.
+
+error_to_json0(not_found) ->
+ {404, not_found, "missing"};
+error_to_json0({missing_rev, Msg}) ->
+ {412, missing_rev, Msg};
+error_to_json0({not_found, Reason}) ->
+ {404, not_found, Reason};
+error_to_json0({database_already_exists, Reason}) ->
+ {409, database_already_exists, Reason};
+error_to_json0(conflict) ->
+ {412, conflict, "Update conflict"};
+error_to_json0({doc_validation, Msg}) ->
+ {406, doc_validation, Msg};
+error_to_json0({Id, Reason}) when is_atom(Id) ->
+ {500, Id, Reason};
+error_to_json0(Error) ->
+ {500, error, Error}.
+
+%%
+%% Configuration
+%%
+
+%% load
+
+load("Foo Bar", []) ->
+ {ok, [], {script_alias, {"foo", "bar"}}}.