diff options
author | Christopher Lenz <cmlenz@apache.org> | 2008-03-28 23:32:19 +0000 |
---|---|---|
committer | Christopher Lenz <cmlenz@apache.org> | 2008-03-28 23:32:19 +0000 |
commit | 544a38dd45f6a58d34296c6c768afd086eb2ac70 (patch) | |
tree | c84cc02340b06aae189cff0dbfaee698f273f1f5 /src/couchdb | |
parent | 804cbbe033b8e7a3e8d7058aaf31bdf69ef18ac5 (diff) |
Imported trunk.
git-svn-id: https://svn.apache.org/repos/asf/incubator/couchdb/trunk@642432 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'src/couchdb')
-rw-r--r-- | src/couchdb/Makefile.am | 97 | ||||
-rw-r--r-- | src/couchdb/cjson.erl | 567 | ||||
-rw-r--r-- | src/couchdb/couch.app.tpl.in | 29 | ||||
-rw-r--r-- | src/couchdb/couch_btree.erl | 590 | ||||
-rw-r--r-- | src/couchdb/couch_db.erl | 757 | ||||
-rw-r--r-- | src/couchdb/couch_db.hrl | 56 | ||||
-rw-r--r-- | src/couchdb/couch_db_update_notifier.erl | 66 | ||||
-rw-r--r-- | src/couchdb/couch_doc.erl | 199 | ||||
-rw-r--r-- | src/couchdb/couch_erl_driver.c | 160 | ||||
-rw-r--r-- | src/couchdb/couch_event_sup.erl | 69 | ||||
-rw-r--r-- | src/couchdb/couch_file.erl | 323 | ||||
-rw-r--r-- | src/couchdb/couch_ft_query.erl | 78 | ||||
-rw-r--r-- | src/couchdb/couch_js.c | 452 | ||||
-rw-r--r-- | src/couchdb/couch_key_tree.erl | 139 | ||||
-rw-r--r-- | src/couchdb/couch_log.erl | 130 | ||||
-rw-r--r-- | src/couchdb/couch_query_servers.erl | 206 | ||||
-rw-r--r-- | src/couchdb/couch_rep.erl | 308 | ||||
-rw-r--r-- | src/couchdb/couch_server.erl | 215 | ||||
-rw-r--r-- | src/couchdb/couch_server_sup.erl | 185 | ||||
-rw-r--r-- | src/couchdb/couch_stream.erl | 252 | ||||
-rw-r--r-- | src/couchdb/couch_util.erl | 316 | ||||
-rw-r--r-- | src/couchdb/couch_view.erl | 616 | ||||
-rw-r--r-- | src/couchdb/mod_couch.erl | 891 |
23 files changed, 6701 insertions, 0 deletions
diff --git a/src/couchdb/Makefile.am b/src/couchdb/Makefile.am new file mode 100644 index 00000000..bf0c31bc --- /dev/null +++ b/src/couchdb/Makefile.am @@ -0,0 +1,97 @@ +## Licensed under the Apache License, Version 2.0 (the "License"); you may not +## use this file except in compliance with the License. You may obtain a copy +## of the License at +## +## http://www.apache.org/licenses/LICENSE-2.0 +## +## Unless required by applicable law or agreed to in writing, software +## distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +## WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +## License for the specific language governing permissions and limitations under +## the License. + +datarootdir = @prefix@/share + +ICU_LOCAL_FLAGS = $(ICU_LOCAL_CFLAGS) $(ICU_LOCAL_LDFLAGS) + +couchprivlibdir = $(erlanglibdir)/couch-$(version)/priv/lib + +couchprivlib_LTLIBRARIES = couch_erl_driver.la +couch_erl_driver_la_SOURCES = couch_erl_driver.c +couch_erl_driver_la_LDFLAGS = -module -avoid-version $(ICU_LOCAL_FLAGS) +couch_erl_driver_la_CFLAGS = $(ICU_LOCAL_FLAGS) +couch_erl_driver_la_LIBADD = -licuuc -licudata -licui18n + +libbin_PROGRAMS = couchjs +couchjs_SOURCES = couch_js.c + +couchebindir = $(erlanglibdir)/couch-$(version)/ebin +couchincludedir = $(erlanglibdir)/couch-$(version)/include + +couch_file_collection = \ + cjson.erl \ + couch_btree.erl \ + couch_db.erl \ + couch_db_update_notifier.erl \ + couch_doc.erl \ + couch_event_sup.erl \ + couch_file.erl \ + couch_ft_query.erl \ + couch_key_tree.erl \ + couch_log.erl \ + couch_query_servers.erl \ + couch_rep.erl \ + couch_server.erl \ + couch_server_sup.erl \ + couch_stream.erl \ + couch_util.erl \ + couch_view.erl \ + mod_couch.erl + +couchebin_DATA = \ + cjson.beam \ + couch.app \ + couch_btree.beam \ + couch_db.beam \ + couch_db_update_notifier.beam \ + couch_doc.beam \ + couch_event_sup.beam \ + couch_file.beam \ + couch_ft_query.beam \ + couch_key_tree.beam \ + couch_log.beam \ + couch_query_servers.beam \ + couch_rep.beam \ + couch_server.beam \ + couch_server_sup.beam \ + couch_stream.beam \ + couch_util.beam \ + couch_view.beam \ + mod_couch.beam + +couchinclude_DATA = couch_db.hrl + +EXTRA_DIST = $(couch_file_collection) $(couchinclude_DATA) + +CLEANFILES = $(couchebin_DATA) + +couch.app: couch.app.tpl + sed -e "s|%package_name%|@package_name@|g" \ + -e "s|%version%|@version@|g" > \ + $@ < $< + chmod +x $@ + +%.beam: %.erl + erlc $< + +install-data-hook: + if test -f "$(DESTDIR)/$(couchprivlibdir)/couch_erl_driver"; then \ + rm -f "$(DESTDIR)/$(couchprivlibdir)/couch_erl_driver.so"; \ + cd "$(DESTDIR)/$(couchprivlibdir)" && \ + $(LN_S) couch_erl_driver couch_erl_driver.so; \ + fi + +uninstall-local: + if test -f "$(DESTDIR)/$(couchprivlibdir)/couch_erl_driver"; then \ + rm -f "$(DESTDIR)/$(couchprivlibdir)/couch_erl_driver.so"; \ + fi diff --git a/src/couchdb/cjson.erl b/src/couchdb/cjson.erl new file mode 100644 index 00000000..042d5c41 --- /dev/null +++ b/src/couchdb/cjson.erl @@ -0,0 +1,567 @@ +%% @author Bob Ippolito <bob@mochimedia.com> +%% @copyright 2006 Mochi Media, Inc. +%% +%% Permission is hereby granted, free of charge, to any person +%% obtaining a copy of this software and associated documentation +%% files (the "Software"), to deal in the Software without restriction, +%% including without limitation the rights to use, copy, modify, merge, +%% publish, distribute, sublicense, and/or sell copies of the Software, +%% and to permit persons to whom the Software is furnished to do +%% so, subject to the following conditions: +%% +%% The above copyright notice and this permission notice shall be included +%% in all copies or substantial portions of the Software. +%% +%% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, +%% EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF +%% MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. +%% IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY +%% CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, +%% TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE +%% SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + +%% @doc Yet another JSON (RFC 4627) library for Erlang. + +-module(cjson). +-author('bob@mochimedia.com'). +-export([encoder/1, encode/1]). +-export([decoder/1, decode/1]). +-export([test/0]). + +% +% NOTE: This file was originally mochijson.erl and has been adapted for +% use with CouchDB. +% +% The changes are: +% {array, [...]} +% is now +% {...} +% and: +% {struct, [...]} +% is now +% {obj, [...]} +% + +% This is a macro to placate syntax highlighters.. +-define(Q, $\"). +-define(ADV_COL(S, N), S#decoder{column=N+S#decoder.column}). +-define(INC_COL(S), S#decoder{column=1+S#decoder.column}). +-define(INC_LINE(S), S#decoder{column=1, line=1+S#decoder.line}). + +%% @type iolist() = [char() | binary() | iolist()] +%% @type iodata() = iolist() | binary() +%% @type json_string() = atom | string() | binary() +%% @type json_number() = integer() | float() +%% @type json_array() = {json_term()} +%% @type json_object() = {struct, [{json_string(), json_term()}]} +%% @type json_term() = json_string() | json_number() | json_array() | +%% json_object() +%% @type encoding() = utf8 | unicode +%% @type encoder_option() = {input_encoding, encoding()} | +%% {handler, function()} +%% @type decoder_option() = {input_encoding, encoding()} | +%% {object_hook, function()} + +-record(encoder, {input_encoding=utf8, + handler=null}). + +-record(decoder, {input_encoding=utf8, + object_hook=null, + line=1, + column=1, + state=null}). + +%% @spec encoder([encoder_option()]) -> function() +%% @doc Create an encoder/1 with the given options. +encoder(Options) -> + State = parse_encoder_options(Options, #encoder{}), + fun (O) -> json_encode(O, State) end. + +%% @spec encode(json_term()) -> iolist() +%% @doc Encode the given as JSON to an iolist. +encode(Any) -> + json_encode(Any, #encoder{}). + +%% @spec decoder([decoder_option()]) -> function() +%% @doc Create a decoder/1 with the given options. +decoder(Options) -> + State = parse_decoder_options(Options, #decoder{}), + fun (O) -> json_decode(O, State) end. + +%% @spec decode(iolist()) -> json_term() +%% @doc Decode the given iolist to Erlang terms. +decode(S) -> + json_decode(S, #decoder{}). + +test() -> + test_all(). + +%% Internal API + +parse_encoder_options([], State) -> + State; +parse_encoder_options([{input_encoding, Encoding} | Rest], State) -> + parse_encoder_options(Rest, State#encoder{input_encoding=Encoding}); +parse_encoder_options([{handler, Handler} | Rest], State) -> + parse_encoder_options(Rest, State#encoder{handler=Handler}). + +parse_decoder_options([], State) -> + State; +parse_decoder_options([{input_encoding, Encoding} | Rest], State) -> + parse_decoder_options(Rest, State#decoder{input_encoding=Encoding}); +parse_decoder_options([{object_hook, Hook} | Rest], State) -> + parse_decoder_options(Rest, State#decoder{object_hook=Hook}). + + +format_float(F) -> + format_float1(lists:reverse(float_to_list(F)), []). + +format_float1([$0, $0, _, $e | Rest], []) -> + strip_zeros(Rest, []); +format_float1([Sign, $e | Rest], Acc) -> + strip_zeros(Rest, [$e, Sign | Acc]); +format_float1([C | Rest], Acc) -> + format_float1(Rest, [C | Acc]). + +strip_zeros(L=[$0, $. | _], Acc) -> + lists:reverse(L, Acc); +strip_zeros([$0 | Rest], Acc) -> + strip_zeros(Rest, Acc); +strip_zeros(L, Acc) -> + lists:reverse(L, Acc). + +json_encode(true, _State) -> + "true"; +json_encode(false, _State) -> + "false"; +json_encode(null, _State) -> + "null"; +json_encode(I, _State) when is_integer(I) -> + integer_to_list(I); +json_encode(F, _State) when is_float(F) -> + format_float(F); +json_encode(L, State) when is_list(L); is_binary(L); is_atom(L) -> + json_encode_string(L, State); +json_encode({obj, Props}, State) when is_list(Props) -> + json_encode_proplist(Props, State); +json_encode(Array, State) when is_tuple(Array) -> + json_encode_array(Array, State); +json_encode(Bad, #encoder{handler=null}) -> + exit({json_encode, {bad_term, Bad}}); +json_encode(Bad, State=#encoder{handler=Handler}) -> + json_encode(Handler(Bad), State). + +json_encode_array({}, _State) -> + "[]"; +json_encode_array(Tuple, State) -> + F = fun (O, Acc) -> + [$,, json_encode(O, State) | Acc] + end, + [$, | Acc1] = lists:foldl(F, "[", tuple_to_list(Tuple)), + lists:reverse([$\] | Acc1]). + +json_encode_proplist([], _State) -> + "{}"; +json_encode_proplist(Props, State) -> + F = fun ({K, V}, Acc) -> + KS = case K of + K when is_atom(K) -> + json_encode_string_utf8(atom_to_list(K), [?Q]); + K when is_integer(K) -> + json_encode_string(integer_to_list(K), State); + K when is_list(K); is_binary(K) -> + json_encode_string(K, State) + end, + VS = json_encode(V, State), + [$,, VS, $:, KS | Acc] + end, + [$, | Acc1] = lists:foldl(F, "{", Props), + lists:reverse([$\} | Acc1]). + +json_encode_string(A, _State) when is_atom(A) -> + json_encode_string_unicode(xmerl_ucs:from_utf8(atom_to_list(A)), [?Q]); +json_encode_string(B, _State) when is_binary(B) -> + json_encode_string_unicode(xmerl_ucs:from_utf8(B), [?Q]); +json_encode_string(S, #encoder{input_encoding=utf8}) -> + json_encode_string_utf8(S, [?Q]); +json_encode_string(S, #encoder{input_encoding=unicode}) -> + json_encode_string_unicode(S, [?Q]). + +json_encode_string_utf8([], Acc) -> + lists:reverse([$\" | Acc]); +json_encode_string_utf8(All=[C | Cs], Acc) -> + case C of + C when C >= 16#7f -> + json_encode_string_unicode(xmerl_ucs:from_utf8(All), Acc); + _ -> + Acc1 = case C of + ?Q -> + [?Q, $\\ | Acc]; + $/ -> + [$/, $\\ | Acc]; + $\\ -> + [$\\, $\\ | Acc]; + $\b -> + [$b, $\\ | Acc]; + $\f -> + [$f, $\\ | Acc]; + $\n -> + [$n, $\\ | Acc]; + $\r -> + [$r, $\\ | Acc]; + $\t -> + [$t, $\\ | Acc]; + C when C >= 0, C < $\s -> + [unihex(C) | Acc]; + C when C >= $\s -> + [C | Acc]; + _ -> + exit({json_encode, {bad_char, C}}) + end, + json_encode_string_utf8(Cs, Acc1) + end. + +json_encode_string_unicode([], Acc) -> + lists:reverse([$\" | Acc]); +json_encode_string_unicode([C | Cs], Acc) -> + Acc1 = case C of + ?Q -> + [?Q, $\\ | Acc]; + $/ -> + [$/, $\\ | Acc]; + $\\ -> + [$\\, $\\ | Acc]; + $\b -> + [$b, $\\ | Acc]; + $\f -> + [$f, $\\ | Acc]; + $\n -> + [$n, $\\ | Acc]; + $\r -> + [$r, $\\ | Acc]; + $\t -> + [$t, $\\ | Acc]; + C when C >= 0, C < $\s; C >= 16#7f, C =< 16#10FFFF -> + [unihex(C) | Acc]; + C when C < 16#7f -> + [C | Acc]; + _ -> + exit({json_encode, {bad_char, C}}) + end, + json_encode_string_unicode(Cs, Acc1). + +dehex(C) when C >= $0, C =< $9 -> + C - $0; +dehex(C) when C >= $a, C =< $f -> + C - $a + 10; +dehex(C) when C >= $A, C =< $F -> + C - $A + 10. + +hexdigit(C) when C >= 0, C =< 9 -> + C + $0; +hexdigit(C) when C =< 15 -> + C + $a - 10. + +unihex(C) when C < 16#10000 -> + <<D3:4, D2:4, D1:4, D0:4>> = <<C:16>>, + Digits = [hexdigit(D) || D <- [D3, D2, D1, D0]], + [$\\, $u | Digits]; +unihex(C) when C =< 16#10FFFF -> + N = C - 16#10000, + S1 = 16#d800 bor ((N bsr 10) band 16#3ff), + S2 = 16#dc00 bor (N band 16#3ff), + [unihex(S1), unihex(S2)]. + +json_decode(B, S) when is_binary(B) -> + json_decode([B], S); +json_decode(L, S) -> + {Res, L1, S1} = decode1(L, S), + {eof, [], _} = tokenize(L1, S1#decoder{state=trim}), + Res. + +decode1(L, S=#decoder{state=null}) -> + case tokenize(L, S#decoder{state=any}) of + {{const, C}, L1, S1} -> + {C, L1, S1}; + {start_array, L1, S1} -> + decode_array(L1, S1#decoder{state=any}, []); + {start_object, L1, S1} -> + decode_object(L1, S1#decoder{state=key}, []) + end. + +make_object(V, #decoder{object_hook=null}) -> + V; +make_object(V, #decoder{object_hook=Hook}) -> + Hook(V). + +decode_object(L, S=#decoder{state=key}, Acc) -> + case tokenize(L, S) of + {end_object, Rest, S1} -> + V = make_object({obj, lists:reverse(Acc)}, S1), + {V, Rest, S1#decoder{state=null}}; + {{const, K}, Rest, S1} when is_list(K) -> + {colon, L2, S2} = tokenize(Rest, S1), + {V, L3, S3} = decode1(L2, S2#decoder{state=null}), + decode_object(L3, S3#decoder{state=comma}, [{K, V} | Acc]) + end; +decode_object(L, S=#decoder{state=comma}, Acc) -> + case tokenize(L, S) of + {end_object, Rest, S1} -> + V = make_object({obj, lists:reverse(Acc)}, S1), + {V, Rest, S1#decoder{state=null}}; + {comma, Rest, S1} -> + decode_object(Rest, S1#decoder{state=key}, Acc) + end. + +decode_array(L, S=#decoder{state=any}, Acc) -> + case tokenize(L, S) of + {end_array, Rest, S1} -> + {list_to_tuple(lists:reverse(Acc)), Rest, S1#decoder{state=null}}; + {start_array, Rest, S1} -> + {Array, Rest1, S2} = decode_array(Rest, S1#decoder{state=any}, []), + decode_array(Rest1, S2#decoder{state=comma}, [Array | Acc]); + {start_object, Rest, S1} -> + {Array, Rest1, S2} = decode_object(Rest, S1#decoder{state=key}, []), + decode_array(Rest1, S2#decoder{state=comma}, [Array | Acc]); + {{const, Const}, Rest, S1} -> + decode_array(Rest, S1#decoder{state=comma}, [Const | Acc]) + end; +decode_array(L, S=#decoder{state=comma}, Acc) -> + case tokenize(L, S) of + {end_array, Rest, S1} -> + {list_to_tuple(lists:reverse(Acc)), Rest, S1#decoder{state=null}}; + {comma, Rest, S1} -> + decode_array(Rest, S1#decoder{state=any}, Acc) + end. + +tokenize_string(IoList=[C | _], S=#decoder{input_encoding=utf8}, Acc) + when is_list(C); is_binary(C); C >= 16#7f -> + List = xmerl_ucs:from_utf8(list_to_binary(lists:flatten(IoList))), + tokenize_string(List, S#decoder{input_encoding=unicode}, Acc); +tokenize_string("\"" ++ Rest, S, Acc) -> + {lists:reverse(Acc), Rest, ?INC_COL(S)}; +tokenize_string("\\\"" ++ Rest, S, Acc) -> + tokenize_string(Rest, ?ADV_COL(S, 2), [$\" | Acc]); +tokenize_string("\\\\" ++ Rest, S, Acc) -> + tokenize_string(Rest, ?ADV_COL(S, 2), [$\\ | Acc]); +tokenize_string("\\/" ++ Rest, S, Acc) -> + tokenize_string(Rest, ?ADV_COL(S, 2), [$/ | Acc]); +tokenize_string("\\b" ++ Rest, S, Acc) -> + tokenize_string(Rest, ?ADV_COL(S, 2), [$\b | Acc]); +tokenize_string("\\f" ++ Rest, S, Acc) -> + tokenize_string(Rest, ?ADV_COL(S, 2), [$\\ | Acc]); +tokenize_string("\\n" ++ Rest, S, Acc) -> + tokenize_string(Rest, ?ADV_COL(S, 2), [$\n | Acc]); +tokenize_string("\\r" ++ Rest, S, Acc) -> + tokenize_string(Rest, ?ADV_COL(S, 2), [$\r | Acc]); +tokenize_string("\\t" ++ Rest, S, Acc) -> + tokenize_string(Rest, ?ADV_COL(S, 2), [$\t | Acc]); +tokenize_string([$\\, $u, C3, C2, C1, C0 | Rest], S, Acc) -> + % coalesce UTF-16 surrogate pair? + C = dehex(C0) bor + (dehex(C1) bsl 4) bor + (dehex(C2) bsl 8) bor + (dehex(C3) bsl 12), + tokenize_string(Rest, ?ADV_COL(S, 6), [C | Acc]); +tokenize_string([C | Rest], S, Acc) when C >= $\s; C < 16#10FFFF -> + tokenize_string(Rest, ?ADV_COL(S, 1), [C | Acc]). + +tokenize_number(IoList=[C | _], Mode, S=#decoder{input_encoding=utf8}, Acc) + when is_list(C); is_binary(C); C >= 16#7f -> + List = xmerl_ucs:from_utf8(list_to_binary(lists:flatten(IoList))), + tokenize_number(List, Mode, S#decoder{input_encoding=unicode}, Acc); +tokenize_number([$- | Rest], sign, S, []) -> + tokenize_number(Rest, int, ?INC_COL(S), [$-]); +tokenize_number(Rest, sign, S, []) -> + tokenize_number(Rest, int, S, []); +tokenize_number([$0 | Rest], int, S, Acc) -> + tokenize_number(Rest, frac, ?INC_COL(S), [$0 | Acc]); +tokenize_number([C | Rest], int, S, Acc) when C >= $1, C =< $9 -> + tokenize_number(Rest, int1, ?INC_COL(S), [C | Acc]); +tokenize_number([C | Rest], int1, S, Acc) when C >= $0, C =< $9 -> + tokenize_number(Rest, int1, ?INC_COL(S), [C | Acc]); +tokenize_number(Rest, int1, S, Acc) -> + tokenize_number(Rest, frac, S, Acc); +tokenize_number([$., C | Rest], frac, S, Acc) when C >= $0, C =< $9 -> + tokenize_number(Rest, frac1, ?ADV_COL(S, 2), [C, $. | Acc]); +tokenize_number([E | Rest], frac, S, Acc) when E == $e; E == $E -> + tokenize_number(Rest, esign, ?INC_COL(S), [$e, $0, $. | Acc]); +tokenize_number(Rest, frac, S, Acc) -> + {{int, lists:reverse(Acc)}, Rest, S}; +tokenize_number([C | Rest], frac1, S, Acc) when C >= $0, C =< $9 -> + tokenize_number(Rest, frac1, ?INC_COL(S), [C | Acc]); +tokenize_number([E | Rest], frac1, S, Acc) when E == $e; E == $E -> + tokenize_number(Rest, esign, ?INC_COL(S), [$e | Acc]); +tokenize_number(Rest, frac1, S, Acc) -> + {{float, lists:reverse(Acc)}, Rest, S}; +tokenize_number([C | Rest], esign, S, Acc) when C == $-; C == $+ -> + tokenize_number(Rest, eint, ?INC_COL(S), [C | Acc]); +tokenize_number(Rest, esign, S, Acc) -> + tokenize_number(Rest, eint, S, Acc); +tokenize_number([C | Rest], eint, S, Acc) when C >= $0, C =< $9 -> + tokenize_number(Rest, eint1, ?INC_COL(S), [C | Acc]); +tokenize_number([C | Rest], eint1, S, Acc) when C >= $0, C =< $9 -> + tokenize_number(Rest, eint1, ?INC_COL(S), [C | Acc]); +tokenize_number(Rest, eint1, S, Acc) -> + {{float, lists:reverse(Acc)}, Rest, S}. + +tokenize([], S=#decoder{state=trim}) -> + {eof, [], S}; +tokenize([L | Rest], S) when is_list(L) -> + tokenize(L ++ Rest, S); +tokenize([B | Rest], S) when is_binary(B) -> + tokenize(xmerl_ucs:from_utf8(B) ++ Rest, S); +tokenize("\r\n" ++ Rest, S) -> + tokenize(Rest, ?INC_LINE(S)); +tokenize("\n" ++ Rest, S) -> + tokenize(Rest, ?INC_LINE(S)); +tokenize([C | Rest], S) when C == $\s; C == $\t -> + tokenize(Rest, ?INC_COL(S)); +tokenize("{" ++ Rest, S) -> + {start_object, Rest, ?INC_COL(S)}; +tokenize("}" ++ Rest, S) -> + {end_object, Rest, ?INC_COL(S)}; +tokenize("[" ++ Rest, S) -> + {start_array, Rest, ?INC_COL(S)}; +tokenize("]" ++ Rest, S) -> + {end_array, Rest, ?INC_COL(S)}; +tokenize("," ++ Rest, S) -> + {comma, Rest, ?INC_COL(S)}; +tokenize(":" ++ Rest, S) -> + {colon, Rest, ?INC_COL(S)}; +tokenize("null" ++ Rest, S) -> + {{const, null}, Rest, ?ADV_COL(S, 4)}; +tokenize("true" ++ Rest, S) -> + {{const, true}, Rest, ?ADV_COL(S, 4)}; +tokenize("false" ++ Rest, S) -> + {{const, false}, Rest, ?ADV_COL(S, 5)}; +tokenize("\"" ++ Rest, S) -> + {String, Rest1, S1} = tokenize_string(Rest, ?INC_COL(S), []), + {{const, xmerl_ucs:to_utf8(String)}, Rest1, S1}; +tokenize(L=[C | _], S) when C >= $0, C =< $9; C == $- -> + case tokenize_number(L, sign, S, []) of + {{int, Int}, Rest, S1} -> + {{const, list_to_integer(Int)}, Rest, S1}; + {{float, Float}, Rest, S1} -> + {{const, list_to_float(Float)}, Rest, S1} + end. + +%% testing constructs borrowed from the Yaws JSON implementation. + +%% Create an object from a list of Key/Value pairs. + +obj_new() -> + {obj, []}. + +is_obj({obj, Props}) -> + F = fun ({K, _}) when is_list(K) -> + true; + (_) -> + false + end, + lists:all(F, Props). + +obj_from_list(Props) -> + Obj = {obj, Props}, + case is_obj(Obj) of + true -> Obj; + false -> exit(json_bad_object) + end. + +%% Test for equivalence of Erlang terms. +%% Due to arbitrary order of construction, equivalent objects might +%% compare unequal as erlang terms, so we need to carefully recurse +%% through aggregates (tuples and objects). + +equiv({obj, Props1}, {obj, Props2}) -> + equiv_object(Props1, Props2); +equiv(T1, T2) when is_tuple(T1), is_tuple(T2) -> + equiv_list(tuple_to_list(T1), tuple_to_list(T2)); +equiv(N1, N2) when is_number(N1), is_number(N2) -> N1 == N2; +equiv(S1, S2) when is_list(S1), is_list(S2) -> S1 == S2; +equiv(true, true) -> true; +equiv(false, false) -> true; +equiv(null, null) -> true. + +%% Object representation and traversal order is unknown. +%% Use the sledgehammer and sort property lists. + +equiv_object(Props1, Props2) -> + L1 = lists:keysort(1, Props1), + L2 = lists:keysort(1, Props2), + Pairs = lists:zip(L1, L2), + true = lists:all(fun({{K1, V1}, {K2, V2}}) -> + equiv(K1, K2) and equiv(V1, V2) + end, Pairs). + +%% Recursively compare tuple elements for equivalence. + +equiv_list([], []) -> + true; +equiv_list([V1 | L1], [V2 | L2]) -> + case equiv(V1, V2) of + true -> + equiv_list(L1, L2); + false -> + false + end. + +test_all() -> + test_one(e2j_test_vec(utf8), 1). + +test_one([], N) -> + io:format("~p tests passed~n", [N-1]), + ok; +test_one([{E, J} | Rest], N) -> + io:format("[~p] ~p ~p~n", [N, E, J]), + true = equiv(E, decode(J)), + true = equiv(E, decode(encode(E))), + test_one(Rest, 1+N). + +e2j_test_vec(unicode) -> + [ + {"foo" ++ [500] ++ "bar", [$", $f, $o, $o, 500, $b, $a, $r, $"]} + ]; +e2j_test_vec(utf8) -> + [ + {1, "1"}, + {3.1416, "3.14160"}, % text representation may truncate, trail zeroes + {-1, "-1"}, + {-3.1416, "-3.14160"}, + {12.0e10, "1.20000e+11"}, + {1.234E+10, "1.23400e+10"}, + {-1.234E-10, "-1.23400e-10"}, + {10.0, "1.0e+01"}, + {123.456, "1.23456E+2"}, + {10.0, "1e1"}, + {"foo", "\"foo\""}, + {"foo" ++ [5] ++ "bar", "\"foo\\u0005bar\""}, + {"", "\"\""}, + {[], "\"\""}, + {"\n\n\n", "\"\\n\\n\\n\""}, + {obj_new(), "{}"}, + {obj_from_list([{"foo", "bar"}]), "{\"foo\":\"bar\"}"}, + {obj_from_list([{"foo", "bar"}, {"baz", 123}]), + "{\"foo\":\"bar\",\"baz\":123}"}, + {{}, "[]"}, + {{{}}, "[[]]"}, + {{1, "foo"}, "[1,\"foo\"]"}, + + % json array in a json object + {obj_from_list([{"foo", {123}}]), + "{\"foo\":[123]}"}, + + % json object in a json object + {obj_from_list([{"foo", obj_from_list([{"bar", true}])}]), + "{\"foo\":{\"bar\":true}}"}, + + % fold evaluation order + {obj_from_list([{"foo", {}}, + {"bar", obj_from_list([{"baz", true}])}, + {"alice", "bob"}]), + "{\"foo\":[],\"bar\":{\"baz\":true},\"alice\":\"bob\"}"}, + + % json object in a json array + {{-123, "foo", obj_from_list([{"bar", {}}]), null}, + "[-123,\"foo\",{\"bar\":[]},null]"} + ]. diff --git a/src/couchdb/couch.app.tpl.in b/src/couchdb/couch.app.tpl.in new file mode 100644 index 00000000..5ddf0989 --- /dev/null +++ b/src/couchdb/couch.app.tpl.in @@ -0,0 +1,29 @@ +{application,couch, + [{description,"@package_name@"}, + {vsn,"@version@"}, + {modules,[couch_btree, + cjson, + couch_db, + couch_doc, + couch_query_servers, + couch_file, + couch_server, + couch_server_sup, + couch_stream, + couch_key_tree, + couch_view, + couch_util, + mod_couch, + couch_event_sup, + couch_db_update_notifier, + couch_ft_query, + couch_log, + couch_rep]}, + {registered,[couch_server, + couch_server_sup, + couch_util, + couch_view, + couch_query_servers, + couch_ft_query]}, + {applications,[kernel,stdlib,xmerl,couch_inets]}, + {mod,{couch_server,[]}}]}. diff --git a/src/couchdb/couch_btree.erl b/src/couchdb/couch_btree.erl new file mode 100644 index 00000000..2ae837dd --- /dev/null +++ b/src/couchdb/couch_btree.erl @@ -0,0 +1,590 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-module(couch_btree). + +-export([open/2, open/3, query_modify/4, add_remove/3, foldl/3, foldl/4]). +-export([foldr/3, foldr/4, fold/4, fold/5, row_count/1]). +-export([lookup/2, get_state/1, test/1, test/0]). + +-define(CHUNK_THRESHOLD, 16#fff). + +-record(btree, + {fd, + root, + extract_kv = fun({Key, Value}) -> {Key, Value} end, + assemble_kv = fun(Key, Value) -> {Key, Value} end, + less = fun(A, B) -> A < B end + }). + +extract(#btree{extract_kv=Extract}, Value) -> + Extract(Value). + +assemble(#btree{assemble_kv=Assemble}, Key, Value) -> + Assemble(Key, Value). + +less(#btree{less=Less}, A, B) -> + Less(A, B). + +% pass in 'nil' for State if a new Btree. +open(State, Fd) -> + {ok, #btree{root=State, fd=Fd}}. + +set_options(Bt, []) -> + Bt; +set_options(Bt, [{split, Extract}|Rest]) -> + set_options(Bt#btree{extract_kv=Extract}, Rest); +set_options(Bt, [{join, Assemble}|Rest]) -> + set_options(Bt#btree{assemble_kv=Assemble}, Rest); +set_options(Bt, [{less, Less}|Rest]) -> + set_options(Bt#btree{less=Less}, Rest). + +open(State, Fd, Options) -> + {ok, set_options(#btree{root=State, fd=Fd}, Options)}. + +get_state(#btree{root=Root}) -> + Root. + +row_count(#btree{root=nil}) -> + 0; +row_count(#btree{root={_RootPointer, Count}}) -> + Count. + +foldl(Bt, Fun, Acc) -> + fold(Bt, fwd, Fun, Acc). + +foldl(Bt, Key, Fun, Acc) -> + fold(Bt, Key, fwd, Fun, Acc). + +foldr(Bt, Fun, Acc) -> + fold(Bt, rev, Fun, Acc). + +foldr(Bt, Key, Fun, Acc) -> + fold(Bt, Key, rev, Fun, Acc). + +% wraps a 2 arity function with the proper 3 arity function +convert_fun_arity(Fun) when is_function(Fun, 2) -> + fun(KV, _Offset, AccIn) -> Fun(KV, AccIn) end; +convert_fun_arity(Fun) when is_function(Fun, 3) -> + Fun. % Already arity 3 + +fold(Bt, Dir, Fun, Acc) -> + {_ContinueFlag, Acc2} = stream_node(Bt, 0, Bt#btree.root, nil, Dir, convert_fun_arity(Fun), Acc), + {ok, Acc2}. + +fold(Bt, Key, Dir, Fun, Acc) -> + {_ContinueFlag, Acc2} = stream_node(Bt, 0, Bt#btree.root, Key, Dir, convert_fun_arity(Fun), Acc), + {ok, Acc2}. + +add_remove(Bt, InsertKeyValues, RemoveKeys) -> + {Result, [], Bt2} = query_modify(Bt, [], InsertKeyValues, RemoveKeys), + {Result, Bt2}. + +query_modify(Bt, LookupKeys, InsertValues, RemoveKeys) -> + #btree{root=Root} = Bt, + InsertActions = lists:map( + fun(KeyValue) -> + {Key, Value} = extract(Bt, KeyValue), + {insert, Key, Value} + end, InsertValues), + RemoveActions = [{remove, Key, nil} || Key <- RemoveKeys], + FetchActions = [{fetch, Key, nil} || Key <- LookupKeys], + SortFun = + fun({OpA, A, _}, {OpB, B, _}) -> + case less(Bt, A, B) of + true -> true; + false -> + case less(Bt, B, A) of + true -> false; + false -> + % A and B are equal, sort by op. + op_order(OpA) < op_order(OpB) + end + end + end, + Actions = lists:sort(SortFun, lists:append([InsertActions, RemoveActions, FetchActions])), + {ok, KeyPointers, QueryResults, Bt2} = modify_node(Bt, Root, Actions, []), + {ok, NewRoot, Bt3} = complete_root(Bt2, KeyPointers), + {ok, QueryResults, Bt3#btree{root=NewRoot}}. + +% for ordering different operatations with the same key. +% fetch < remove < insert +op_order(fetch) -> 1; +op_order(remove) -> 2; +op_order(insert) -> 3. + +lookup(#btree{root=Root, less=Less}=Bt, Keys) -> + SortedKeys = lists:sort(Less, Keys), + {ok, SortedResults} = lookup(Bt, Root, SortedKeys), + % We want to return the results in the same order as the keys were input + % but we may have changed the order when we sorted. So we need to put the + % order back into the results. + KeyDict = dict:from_list(SortedResults), + [dict:fetch(Key, KeyDict) || Key <- Keys]. + +lookup(_Bt, nil, Keys) -> + {ok, [{Key, not_found} || Key <- Keys]}; +lookup(Bt, {Pointer, _Count}, Keys) -> + {NodeType, NodeList} = get_node(Bt, Pointer), + case NodeType of + kp_node -> + lookup_kpnode(Bt, NodeList, Keys, []); + kv_node -> + lookup_kvnode(Bt, NodeList, Keys, []) + end. + + +lookup_kpnode(_Bt, [], Keys, Output) -> + {ok, lists:reverse(Output, [{Key, not_found} || Key <- Keys])}; + +lookup_kpnode(_Bt, _KPs, [], Output) -> + {ok, lists:reverse(Output)}; + +lookup_kpnode(Bt, [{Key, PointerInfo} | RestKPs], LookupKeys, Output) -> + % Split the Keys into two lists, queries of values less + % than equals, and greater than the current key + SplitFun = fun(LookupKey) -> not less(Bt, Key, LookupKey) end, + case lists:splitwith(SplitFun, LookupKeys) of + {[], GreaterQueries} -> + lookup_kpnode(Bt, RestKPs, GreaterQueries, Output); + {LessEqQueries, GreaterQueries} -> + {ok, Results} = lookup(Bt, PointerInfo, LessEqQueries), + lookup_kpnode(Bt, RestKPs, GreaterQueries, lists:reverse(Results, Output)) + end. + + + +lookup_kvnode(_Bt, _KVs, [], Output) -> + {ok, lists:reverse(Output)}; +lookup_kvnode(_Bt, [], Keys, Output) -> + % keys not found + {ok, lists:reverse(Output, [{Key, not_found} || Key <- Keys])}; +lookup_kvnode(Bt, [{Key, Value} | RestKVs], [LookupKey | RestLookupKeys], Output) -> + case less(Bt, LookupKey, Key) of + true -> + lookup_kvnode(Bt, [{Key, Value} | RestKVs], RestLookupKeys, [{LookupKey, not_found} | Output]); + false -> + case less(Bt, Key, LookupKey) of + true -> + % LookupKey is greater than Key + lookup_kvnode(Bt, RestKVs, [LookupKey | RestLookupKeys], Output); + false -> + % LookupKey is equal to Key + lookup_kvnode(Bt, RestKVs, RestLookupKeys, [{LookupKey, {ok, assemble(Bt, LookupKey, Value)}} | Output]) + end + end. + + +complete_root(Bt, []) -> + {ok, nil, Bt}; +complete_root(Bt, [{_Key, PointerInfo}])-> + {ok, PointerInfo, Bt}; +complete_root(Bt, KPs) -> + {ok, ResultKeyPointers, Bt2} = write_node(Bt, kp_node, KPs), + complete_root(Bt2, ResultKeyPointers). + +%%%%%%%%%%%%% The chunkify function sucks! %%%%%%%%%%%%% +% It is inaccurate as it does not account for compression when blocks are +% written. Plus with the "case size(term_to_binary(InList)) of" code it's +% probably really inefficient. + +chunkify(_Bt, []) -> + []; +chunkify(Bt, InList) -> + case size(term_to_binary(InList)) of + Size when Size > ?CHUNK_THRESHOLD -> + NumberOfChunksLikely = ((Size div ?CHUNK_THRESHOLD) + 1), + ChunkThreshold = Size div NumberOfChunksLikely, + chunkify(Bt, InList, ChunkThreshold, [], 0, []); + _Else -> + [InList] + end. + +chunkify(_Bt, [], _ChunkThreshold, [], 0, OutputChunks) -> + lists:reverse(OutputChunks); +chunkify(_Bt, [], _ChunkThreshold, OutList, _OutListSize, OutputChunks) -> + lists:reverse([lists:reverse(OutList) | OutputChunks]); +chunkify(Bt, [InElement | RestInList], ChunkThreshold, OutList, OutListSize, OutputChunks) -> + case size(term_to_binary(InElement)) of + Size when (Size + OutListSize) > ChunkThreshold -> + chunkify(Bt, RestInList, ChunkThreshold, [], 0, [lists:reverse([InElement | OutList]) | OutputChunks]); + Size -> + chunkify(Bt, RestInList, ChunkThreshold, [InElement | OutList], OutListSize + Size, OutputChunks) + end. + +modify_node(Bt, RootPointerInfo, Actions, QueryOutput) -> + case RootPointerInfo of + nil -> + NodeType = kv_node, + NodeList = []; + {Pointer, _count} -> + {NodeType, NodeList} = get_node(Bt, Pointer) + end, + case NodeType of + kp_node -> + {ok, NewNodeList, QueryOutput2, Bt2} = modify_kpnode(Bt, NodeList, Actions, [], QueryOutput); + kv_node -> + {ok, NewNodeList, QueryOutput2, Bt2} = modify_kvnode(Bt, NodeList, Actions, [], QueryOutput) + end, + case NewNodeList of + [] -> % no nodes remain + {ok, [], QueryOutput2, Bt2}; + NodeList -> % nothing changed + {LastKey, _LastValue} = lists:last(NodeList), + {ok, [{LastKey, RootPointerInfo}], QueryOutput2, Bt2}; + _Else2 -> + {ok, ResultList, Bt3} = write_node(Bt2, NodeType, NewNodeList), + {ok, ResultList, QueryOutput2, Bt3} + end. + + +count(kv_node, NodeList) -> + length(NodeList); +count(kp_node, NodeList) -> + lists:foldl( fun({_Key, {_Pointer, Count}}, AccCount) -> + Count + AccCount + end, + 0, NodeList). + + +get_node(#btree{fd = Fd}, NodePos) -> + {ok, {NodeType, NodeList}} = couch_file:pread_term(Fd, NodePos), + case NodeType of + kp_node -> + % Node pointers always point backward on disk. + % Validating this prevents infinite loops should + % a disk corruption occur. + [throw({error, disk_corruption}) + || {_Key, {SubNodePos, _Count}} + <- NodeList, SubNodePos >= NodePos]; + kv_node -> + ok + end, + {NodeType, NodeList}. + +write_node(Bt, NodeType, NodeList) -> + % split up nodes into smaller sizes + NodeListList = chunkify(Bt, NodeList), + % now write out each chunk and return the KeyPointer pairs for those nodes + ResultList = [ + begin + {ok, Pointer} = couch_file:append_term(Bt#btree.fd, {NodeType, ANodeList}), + {LastKey, _} = lists:last(ANodeList), + {LastKey, {Pointer, count(NodeType, ANodeList)}} + end + || + ANodeList <- NodeListList + ], + {ok, ResultList, Bt}. + +modify_kpnode(Bt, KPs, [], ResultNode, QueryOutput) -> + % processed all queries for the current tree + {ok, lists:reverse(ResultNode, KPs), QueryOutput, Bt}; + +modify_kpnode(Bt, [], Actions, [{_Key, PointerInfo} | ResultNode], QueryOutput) -> + {ok, ChildKPs, QueryOutput2, Bt2} = modify_node(Bt, PointerInfo, Actions, QueryOutput), + {ok, lists:reverse(ResultNode, ChildKPs), QueryOutput2, Bt2}; + +modify_kpnode(Bt, [{Key,PointerInfo} | RestKPs], Actions, ResultNode, QueryOutput) -> + % Split the actions into two lists, queries of values less + % than equals, and greater than the current key + SplitFun = fun({_ActionType, ActionKey, _ActionValue}) -> + not less(Bt, Key, ActionKey) + end, + case lists:splitwith(SplitFun, Actions) of + {[], GreaterQueries} -> + modify_kpnode(Bt, RestKPs, GreaterQueries, [{Key, PointerInfo} | ResultNode], QueryOutput); + {LessEqQueries, GreaterQueries} -> + {ok, ChildKPs, QueryOutput2, Bt2} = modify_node(Bt, PointerInfo, LessEqQueries, QueryOutput), + modify_kpnode(Bt2, RestKPs, GreaterQueries, lists:reverse(ChildKPs, ResultNode), QueryOutput2) + end. + +modify_kvnode(Bt, KVs, [], ResultNode, QueryOutput) -> + {ok, lists:reverse(ResultNode, KVs), QueryOutput, Bt}; +modify_kvnode(Bt, [], [{ActionType, ActionKey, ActionValue} | RestActions], ResultNode, QueryOutput) -> + case ActionType of + insert -> + modify_kvnode(Bt, [], RestActions, [{ActionKey, ActionValue} | ResultNode], QueryOutput); + remove -> + % just drop the action + modify_kvnode(Bt, [], RestActions, ResultNode, QueryOutput); + fetch -> + % the key/value must not exist in the tree + modify_kvnode(Bt, [], RestActions, ResultNode, [{not_found, {ActionKey, nil}} | QueryOutput]) + end; +modify_kvnode(Bt, [{Key, Value} | RestKVs], [{ActionType, ActionKey, ActionValue} | RestActions], ResultNode, QueryOutput) -> + case less(Bt, ActionKey, Key) of + true -> + case ActionType of + insert -> + % ActionKey is less than the Key, so insert + modify_kvnode(Bt, [{Key, Value} | RestKVs], RestActions, [{ActionKey, ActionValue} | ResultNode], QueryOutput); + remove -> + % ActionKey is less than the Key, just drop the action + modify_kvnode(Bt, [{Key, Value} | RestKVs], RestActions, ResultNode, QueryOutput); + fetch -> + % ActionKey is less than the Key, the key/value must not exist in the tree + modify_kvnode(Bt, [{Key, Value} | RestKVs], RestActions, ResultNode, [{not_found, {ActionKey, nil}} | QueryOutput]) + end; + false -> + case less(Bt, Key, ActionKey) of + true -> + % ActionKey is greater than Key + modify_kvnode(Bt, RestKVs, [{ActionType, ActionKey, ActionValue} | RestActions], [{Key, Value} | ResultNode], QueryOutput); + false -> + % InsertKey is equal to Key + case ActionType of + insert -> + % ActionKey is less than the Key, so insert + modify_kvnode(Bt, RestKVs, RestActions, [{ActionKey, ActionValue} | ResultNode], QueryOutput); + remove -> + modify_kvnode(Bt, RestKVs, RestActions, ResultNode, QueryOutput); + fetch -> + % ActionKey is equal to the Key, insert into the QueryOuput, but re-process the node + % since an identical action key can follow it. + modify_kvnode(Bt, [{Key, Value} | RestKVs], RestActions, ResultNode, [{ok, assemble(Bt, Key, Value)} | QueryOutput]) + end + end + end. + +adjust_dir(fwd, List) -> + List; +adjust_dir(rev, List) -> + lists:reverse(List). + +stream_node(Bt, Offset, PointerInfo, nil, Dir, Fun, Acc) -> + stream_node(Bt, Offset, PointerInfo, Dir, Fun, Acc); +stream_node(_Bt, _Offset, nil, _StartKey, _Dir, _Fun, Acc) -> + {ok, Acc}; +stream_node(Bt, Offset, {Pointer, _Count}, StartKey, Dir, Fun, Acc) -> + {NodeType, NodeList} = get_node(Bt, Pointer), + case NodeType of + kp_node -> + stream_kp_node(Bt, Offset, adjust_dir(Dir, NodeList), StartKey, Dir, Fun, Acc); + kv_node -> + stream_kv_node(Bt, Offset, adjust_dir(Dir, NodeList), StartKey, Dir, Fun, Acc) + end. + +stream_node(_Bt, _Offset, nil, _Dir, _Fun, Acc) -> + {ok, Acc}; +stream_node(Bt, Offset, {Pointer, _Count}, Dir, Fun, Acc) -> + {NodeType, NodeList} = get_node(Bt, Pointer), + case NodeType of + kp_node -> + stream_kp_node(Bt, Offset, adjust_dir(Dir, NodeList), Dir, Fun, Acc); + kv_node -> + stream_kv_node(Bt, Offset, adjust_dir(Dir, NodeList), Dir, Fun, Acc) + end. + +stream_kp_node(_Bt, _Offset, [], _Dir, _Fun, Acc) -> + {ok, Acc}; +stream_kp_node(Bt, Offset, [{_Key, {Pointer, Count}} | Rest], Dir, Fun, Acc) -> + case stream_node(Bt, Offset, {Pointer, Count}, Dir, Fun, Acc) of + {ok, Acc2} -> + stream_kp_node(Bt, Offset + Count, Rest, Dir, Fun, Acc2); + {stop, Acc2} -> + {stop, Acc2} + end. + +drop_nodes(_Bt, Offset, _StartKey, []) -> + {Offset, []}; +drop_nodes(Bt, Offset, StartKey, [{NodeKey, {Pointer, Count}} | RestKPs]) -> + case less(Bt, NodeKey, StartKey) of + true -> drop_nodes(Bt, Offset + Count, StartKey, RestKPs); + false -> {Offset, [{NodeKey, {Pointer, Count}} | RestKPs]} + end. + +stream_kp_node(Bt, Offset, KPs, StartKey, Dir, Fun, Acc) -> + {NewOffset, NodesToStream} = + case Dir of + fwd -> + % drop all nodes sorting before the key + drop_nodes(Bt, Offset, StartKey, KPs); + rev -> + % keep all nodes sorting before the key, AND the first node to sort after + RevKPs = lists:reverse(KPs), + case lists:splitwith(fun({Key, _Pointer}) -> less(Bt, Key, StartKey) end, RevKPs) of + {_RevBefore, []} -> + % everything sorts before it + {Offset, KPs}; + {RevBefore, [FirstAfter | Drop]} -> + {Offset + count(kp_node, Drop), [FirstAfter | lists:reverse(RevBefore)]} + end + end, + case NodesToStream of + [] -> + {ok, Acc}; + [{_Key, PointerInfo} | Rest] -> + case stream_node(Bt, NewOffset, PointerInfo, StartKey, Dir, Fun, Acc) of + {ok, Acc2} -> + stream_kp_node(Bt, NewOffset, Rest, Dir, Fun, Acc2); + {stop, Acc2} -> + {stop, Acc2} + end + end. + +stream_kv_node(_Bt, _Offset, [], _Dir, _Fun, Acc) -> + {ok, Acc}; +stream_kv_node(Bt, Offset, [{K, V} | RestKVs], Dir, Fun, Acc) -> + case Fun(assemble(Bt, K, V), Offset, Acc) of + {ok, Acc2} -> + stream_kv_node(Bt, Offset + 1, RestKVs, Dir, Fun, Acc2); + {stop, Acc2} -> + {stop, Acc2} + end. + +stream_kv_node(Bt, Offset, KVs, StartKey, Dir, Fun, Acc) -> + DropFun = + case Dir of + fwd -> + fun({Key, _}) -> less(Bt, Key, StartKey) end; + rev -> + fun({Key, _}) -> less(Bt, StartKey, Key) end + end, + % drop all nodes preceding the key + GTEKVs = lists:dropwhile(DropFun, KVs), + LenSkipped = length(KVs) - length(GTEKVs), + stream_kv_node(Bt, Offset + LenSkipped, GTEKVs, Dir, Fun, Acc). + + + + +test()-> + test(1000). + +test(N) -> + KeyValues = [{random:uniform(), random:uniform()} || _Seq <- lists:seq(1, N)], + test_btree(KeyValues), % randomly distributed + Sorted = lists:sort(KeyValues), + test_btree(Sorted), % sorted regular + test_btree(lists:reverse(Sorted)). % sorted reverse + + +test_btree(KeyValues) -> + {ok, Fd} = couch_file:open("foo", [create,overwrite]), + {ok, Btree} = open(nil, Fd), + + % first dump in all the values in one go + {ok, Btree10} = add_remove(Btree, KeyValues, []), + + ok = test_keys(Btree10, KeyValues), + + % remove everything + {ok, Btree20} = test_remove(Btree10, KeyValues), + + % make sure its empty + {ok, false} = foldl(Btree20, fun(_X, _Acc) -> + {ok, true} % change Acc to 'true' + end, + false), + + % add everything back one at a time. + {ok, Btree30} = test_add(Btree20, KeyValues), + + ok = test_keys(Btree30, KeyValues), + + KeyValuesRev = lists:reverse(KeyValues), + + % remove everything, in reverse order + {ok, Btree40} = test_remove(Btree30, KeyValuesRev), + + % make sure its empty + {ok, false} = foldl(Btree40, fun(_X, _Acc) -> + {ok, true} % change Acc to 'true' + end, + false), + + + {A, B} = every_other(KeyValues), + + % add everything back + {ok, Btree50} = test_add(Btree40,KeyValues), + + ok = test_keys(Btree50, KeyValues), + + % remove half the values + {ok, Btree60} = test_remove(Btree50, A), + + % verify the remaining + ok = test_keys(Btree60, B), + + % add A back + {ok, Btree70} = test_add(Btree60, A), + + % verify + ok = test_keys(Btree70, KeyValues), + + % remove B + {ok, Btree80} = test_remove(Btree70, B), + + % verify the remaining + ok = test_keys(Btree80, A), + + ok = couch_file:close(Fd). + + + + +every_other(List) -> + every_other(List, [], [], 1). + +every_other([], AccA, AccB, _Flag) -> + {lists:reverse(AccA), lists:reverse(AccB)}; +every_other([H|T], AccA, AccB, 1) -> + every_other(T, [H|AccA], AccB, 0); +every_other([H|T], AccA, AccB, 0) -> + every_other(T, AccA, [H|AccB], 1). + +test_keys(Btree, List) -> + FoldFun = + fun(Element, [HAcc|TAcc]) -> + Element = HAcc, % must match + {ok, TAcc} + end, + Sorted = lists:sort(List), + {ok, []} = foldl(Btree, FoldFun, Sorted), + {ok, []} = foldr(Btree, FoldFun, lists:reverse(Sorted)), + + test_lookup(Btree, List). + +% Makes sure each key value pair is found in the btree +test_lookup(_Btree, []) -> + ok; +test_lookup(Btree, [{Key, Value} | Rest]) -> + [{ok,{Key, Value}}] = lookup(Btree, [Key]), + {ok, []} = foldl(Btree, Key, fun({KeyIn, ValueIn}, []) -> + KeyIn = Key, + ValueIn = Value, + {stop, []} + end, + []), + {ok, []} = foldr(Btree, Key, fun({KeyIn, ValueIn}, []) -> + KeyIn = Key, + ValueIn = Value, + {stop, []} + end, + []), + test_lookup(Btree, Rest). + +% removes each key one at a time from the btree +test_remove(Btree, []) -> + {ok, Btree}; +test_remove(Btree, [{Key, _Value} | Rest]) -> + {ok, Btree2} = add_remove(Btree,[], [Key]), + test_remove(Btree2, Rest). + +% adds each key one at a time from the btree +test_add(Btree, []) -> + {ok, Btree}; +test_add(Btree, [KeyValue | Rest]) -> + {ok, Btree2} = add_remove(Btree, [KeyValue], []), + test_add(Btree2, Rest). diff --git a/src/couchdb/couch_db.erl b/src/couchdb/couch_db.erl new file mode 100644 index 00000000..e567d27b --- /dev/null +++ b/src/couchdb/couch_db.erl @@ -0,0 +1,757 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-module(couch_db). +-behaviour(gen_server). + +-export([open/2,create/2,create/3,get_doc_info/2]). +-export([save_docs/2, save_docs/3, get_db_info/1, update_doc/3, update_docs/2, update_docs/3]). +-export([delete_doc/3,open_doc/2,open_doc/3,close/1,enum_docs_since/4,enum_docs_since/5]). +-export([enum_docs/4,enum_docs/5, open_doc_revs/4, get_missing_revs/2]). +-export([start_update_loop/1]). +-export([init/1,terminate/2,handle_call/3,handle_cast/2,code_change/3,handle_info/2]). + +-include("couch_db.hrl"). + +-record(db_header, + {write_version = 0, + last_update_seq = 0, + summary_stream_state = nil, + docinfo_by_Id_btree_state = nil, + docinfo_by_seq_btree_state = nil, + local_docs_btree_state = nil, + doc_count=0, + doc_del_count=0 + }). + +-record(db, + {main_pid, + update_pid, + fd, + header = #db_header{}, + summary_stream, + docinfo_by_Id_btree, + docinfo_by_seq_btree, + local_docs_btree, + last_update_seq, + doc_count, + doc_del_count, + name + }). + +start_link(DbName, Filepath, Options) -> + case couch_file:open(Filepath, Options) of + {ok, Fd} -> + Result = gen_server:start_link(couch_db, {DbName, Fd, Options}, []), + unlink(Fd), + Result; + {error, enoent} -> + % couldn't find file + {error, not_found}; + Else -> + Else + end. + +%%% Interface functions %%% + +create(Filepath, Options) -> + create(Filepath, Filepath, Options). + +create(DbName, Filepath, Options) when is_list(Options) -> + start_link(DbName, Filepath, [create | Options]). + +open(DbName, Filepath) -> + start_link(DbName, Filepath, []). + +delete_doc(MainPid, Id, Revisions) -> + DeletedDocs = [#doc{id=Id, revs=[Rev], deleted=true} || Rev <- Revisions], + {ok, [Result]} = update_docs(MainPid, DeletedDocs, [new_edits]), + {ok, Result}. + +open_doc(MainPid, IdOrDocInfo) -> + open_doc(MainPid, IdOrDocInfo, []). + +open_doc(MainPid, Id, Options) -> + case open_doc_int(get_db(MainPid), Id, Options) of + {ok, #doc{deleted=true}=Doc} -> + case lists:member(deleted, Options) of + true -> + {ok, Doc}; + false -> + {not_found, deleted} + end; + Else -> + Else + end. + +open_doc_revs(MainPid, Id, Revs, Options) -> + open_doc_revs_int(get_db(MainPid), Id, Revs, Options). + +get_missing_revs(MainPid, IdRevsList) -> + Ids = [Id1 || {Id1, _Revs} <- IdRevsList], + FullDocInfoResults = get_full_doc_infos(MainPid, Ids), + Results = lists:zipwith( + fun({Id, Revs}, FullDocInfoResult) -> + case FullDocInfoResult of + {ok, #full_doc_info{rev_tree=RevisionTree}} -> + {Id, couch_key_tree:find_missing(RevisionTree, Revs)}; + not_found -> + {Id, Revs} + end + end, + IdRevsList, FullDocInfoResults), + {ok, Results}. + +get_doc_info(Db, Id) -> + case get_full_doc_info(Db, Id) of + {ok, DocInfo} -> + {ok, couch_doc:to_doc_info(DocInfo)}; + Else -> + Else + end. + +% returns {ok, DocInfo} or not_found +get_full_doc_info(Db, Id) -> + [Result] = get_full_doc_infos(Db, [Id]), + Result. + + +get_full_doc_infos(MainPid, Ids) when is_pid(MainPid) -> + get_full_doc_infos(get_db(MainPid), Ids); +get_full_doc_infos(#db{}=Db, Ids) -> + couch_btree:lookup(Db#db.docinfo_by_Id_btree, Ids). + +get_db_info(MainPid) when is_pid(MainPid) -> + get_db_info(get_db(MainPid)); +get_db_info(#db{doc_count=Count, doc_del_count=DelCount, last_update_seq=SeqNum}) -> + InfoList = [ + {doc_count, Count}, + {doc_del_count, DelCount}, + {last_update_seq, SeqNum} + ], + {ok, InfoList}. + +update_doc(MainPid, Doc, Options) -> + {ok, [NewRev]} = update_docs(MainPid, [Doc], Options), + {ok, NewRev}. + +update_docs(MainPid, Docs) -> + update_docs(MainPid, Docs, []). + +% group_alike_docs groups the sorted documents into sublist buckets, by id. +% ([DocA, DocA, DocB, DocC], []) -> [[DocA, DocA], [DocB], [DocC]] +group_alike_docs(Docs) -> + Sorted = lists:sort(fun(#doc{id=A},#doc{id=B})-> A < B end, Docs), + group_alike_docs(Sorted, []). + +group_alike_docs([], Buckets) -> + lists:reverse(Buckets); +group_alike_docs([Doc|Rest], []) -> + group_alike_docs(Rest, [[Doc]]); +group_alike_docs([Doc|Rest], [Bucket|RestBuckets]) -> + [#doc{id=BucketId}|_] = Bucket, + case Doc#doc.id == BucketId of + true -> + % add to existing bucket + group_alike_docs(Rest, [[Doc|Bucket]|RestBuckets]); + false -> + % add to new bucket + group_alike_docs(Rest, [[Doc]|[Bucket|RestBuckets]]) + end. + + +prepare_doc_for_new_edit(Db, #doc{id=Id,revs=[NewRev|PrevRevs]}=Doc, OldFullDocInfo, LeafRevsDict) -> + case PrevRevs of + [PrevRev|_] -> + case dict:find(PrevRev, LeafRevsDict) of + {ok, {Deleted, Sp, DiskRevs}} -> + case couch_doc:has_stubs(Doc) of + true -> + DiskDoc = make_doc(Db, Id, Deleted, Sp, DiskRevs), + Doc2 = couch_doc:merge_stubs(Doc, DiskDoc), + Doc2#doc{revs=[NewRev|DiskRevs]}; + false -> + Doc#doc{revs=[NewRev|DiskRevs]} + end; + error -> + throw(conflict) + end; + [] -> + % new doc, and we have existing revs. + OldDocInfo = couch_doc:to_doc_info(OldFullDocInfo), + if OldDocInfo#doc_info.deleted -> + % existing doc is a deleton + % allow this new doc to be a later revision. + {_Deleted, _Sp, Revs} = dict:fetch(OldDocInfo#doc_info.rev, LeafRevsDict), + Doc#doc{revs=[NewRev|Revs]}; + true -> + throw(conflict) + end + end. + +update_docs(MainPid, Docs, Options) -> + Docs2 = lists:map( + fun(#doc{id=Id,revs=Revs}=Doc) -> + case Id of + ?LOCAL_DOC_PREFIX ++ _ -> + Rev = case Revs of [] -> 0; [Rev0|_] -> list_to_integer(Rev0) end, + Doc#doc{revs=[integer_to_list(Rev + 1)]}; + _ -> + Doc#doc{revs=[integer_to_list(couch_util:rand32()) | Revs]} + end + end, Docs), + DocBuckets = group_alike_docs(Docs2), + Ids = [Id || [#doc{id=Id}|_] <- DocBuckets], + Db = get_db(MainPid), + + % first things first, lookup the doc by id and get the most recent + + ExistingDocs = get_full_doc_infos(Db, Ids), + + DocBuckets2 = lists:zipwith( + fun(Bucket, not_found) -> + % no existing revs, make sure no old revision is specified. + [throw(conflict) || #doc{revs=[_NewRev, _OldRev | _]} <- Bucket], + Bucket; + (Bucket, {ok, #full_doc_info{rev_tree=OldRevTree}=OldFullDocInfo}) -> + Leafs = couch_key_tree:get_all_leafs(OldRevTree), + LeafRevsDict = dict:from_list([{Rev, {Deleted, Sp, Revs}} || {Rev, {Deleted, Sp}, Revs} <- Leafs]), + [prepare_doc_for_new_edit(Db, Doc, OldFullDocInfo, LeafRevsDict) || Doc <- Bucket] + end, + DocBuckets, ExistingDocs), + + % flush unwritten binaries to disk. + DocBuckets3 = [[doc_flush_binaries(Doc, Db#db.fd) || Doc <- Bucket] || Bucket <- DocBuckets2], + + case gen_server:call(MainPid, {update_docs, DocBuckets3, Options}) of + ok -> + % return back the new rev ids, in the same order input. + {ok, [NewRev || #doc{revs=[NewRev|_]} <- Docs2]}; + Else-> + throw(Else) + end. + +save_docs(MainPid, Docs) -> + save_docs(MainPid, Docs, []). + +save_docs(MainPid, Docs, Options) -> + % flush unwritten binaries to disk. + Db = get_db(MainPid), + DocBuckets = group_alike_docs(Docs), + DocBuckets2 = [[doc_flush_binaries(Doc, Db#db.fd) || Doc <- Bucket] || Bucket <- DocBuckets], + ok = gen_server:call(MainPid, {update_docs, DocBuckets2, Options}). + + +doc_flush_binaries(Doc, Fd) -> + % calc size of binaries to write out + Bins = Doc#doc.attachments, + PreAllocSize = lists:foldl( + fun(BinValue, SizeAcc) -> + case BinValue of + {_Key, {_Type, {Fd0, _StreamPointer, _Len}}} when Fd0 == Fd -> + % already written to our file, nothing to write + SizeAcc; + {_Key, {_Type, {_OtherFd, _StreamPointer, Len}}} -> + % written to a different file + SizeAcc + Len; + {_Key, {_Type, Bin}} when is_binary(Bin) -> + SizeAcc + size(Bin) + end + end, + 0, Bins), + + {ok, OutputStream} = couch_stream:open(Fd), + ok = couch_stream:ensure_buffer(OutputStream, PreAllocSize), + + NewBins = lists:map( + fun({Key, {Type, BinValue}}) -> + NewBinValue = + case BinValue of + {Fd0, StreamPointer, Len} when Fd0 == Fd -> + % already written to our file, nothing to write + {Fd, StreamPointer, Len}; + {OtherFd, StreamPointer, Len} -> + % written to a different file (or a closed file + % instance, which will cause an error) + {ok, {NewStreamPointer, Len}, _EndSp} = + couch_stream:foldl(OtherFd, StreamPointer, Len, + fun(Bin, {BeginPointer, SizeAcc}) -> + {ok, Pointer} = couch_stream:write(OutputStream, Bin), + case SizeAcc of + 0 -> % this was the first write, record the pointer + {ok, {Pointer, size(Bin)}}; + _ -> + {ok, {BeginPointer, SizeAcc + size(Bin)}} + end + end, + {{0,0}, 0}), + {Fd, NewStreamPointer, Len}; + Bin when is_binary(Bin), size(Bin) > 0 -> + {ok, StreamPointer} = couch_stream:write(OutputStream, Bin), + {Fd, StreamPointer, size(Bin)} + end, + {Key, {Type, NewBinValue}} + end, Bins), + + {ok, _FinalPos} = couch_stream:close(OutputStream), + + Doc#doc{attachments = NewBins}. + +enum_docs_since(MainPid, SinceSeq, Direction, InFun, Ctx) -> + Db = get_db(MainPid), + couch_btree:fold(Db#db.docinfo_by_seq_btree, SinceSeq + 1, Direction, InFun, Ctx). + +enum_docs_since(MainPid, SinceSeq, InFun, Acc) -> + enum_docs_since(MainPid, SinceSeq, fwd, InFun, Acc). + +enum_docs(MainPid, StartId, Direction, InFun, InAcc) -> + Db = get_db(MainPid), + couch_btree:fold(Db#db.docinfo_by_Id_btree, StartId, Direction, InFun, InAcc). + +enum_docs(MainPid, StartId, InFun, Ctx) -> + enum_docs(MainPid, StartId, fwd, InFun, Ctx). + +close(MainPid) -> + Ref = erlang:monitor(process, MainPid), + unlink(MainPid), + exit(MainPid, normal), + receive + {'DOWN', Ref, process, MainPid, _Reason} -> + ok + end. + + +% server functions + +init({DbName, Fd, Options}) -> + link(Fd), + case lists:member(create, Options) of + true -> + % create a new header and writes it to the file + Header = #db_header{}, + ok = couch_file:write_header(Fd, <<$g, $m, $k, 0>>, Header), + ok = couch_file:sync(Fd), + init_main(DbName, Fd, Header); + false -> + {ok, Header} = couch_file:read_header(Fd, <<$g, $m, $k, 0>>), + init_main(DbName, Fd, Header) + end. + +btree_by_seq_split(DocInfo) -> + #doc_info{ + id = Id, + rev = Rev, + update_seq = Seq, + summary_pointer = Sp, + conflict_revs = Conflicts, + deleted_conflict_revs = DelConflicts, + deleted = Deleted} = DocInfo, + {Seq,{Id, Rev, Sp, Conflicts, DelConflicts, Deleted}}. + +btree_by_seq_join(Seq,{Id, Rev, Sp, Conflicts, DelConflicts, Deleted}) -> + #doc_info{ + id = Id, + rev = Rev, + update_seq = Seq, + summary_pointer = Sp, + conflict_revs = Conflicts, + deleted_conflict_revs = DelConflicts, + deleted = Deleted}. + +btree_by_name_split(#full_doc_info{id=Id, update_seq=Seq, rev_tree=Tree}) -> + {Id, {Seq, Tree}}. + +btree_by_name_join(Id, {Seq, Tree}) -> + #full_doc_info{id=Id, update_seq=Seq, rev_tree=Tree}. + + +init_main(DbName, Fd, Header) -> + {ok, SummaryStream} = couch_stream:open(Header#db_header.summary_stream_state, Fd), + ok = couch_stream:set_min_buffer(SummaryStream, 10000), + {ok, IdBtree} = couch_btree:open(Header#db_header.docinfo_by_Id_btree_state, Fd, + [{split, fun(V) -> btree_by_name_split(V) end}, + {join, fun(K,V) -> btree_by_name_join(K,V) end}] ), + {ok, SeqBtree} = couch_btree:open(Header#db_header.docinfo_by_seq_btree_state, Fd, + [{split, fun(V) -> btree_by_seq_split(V) end}, + {join, fun(K,V) -> btree_by_seq_join(K,V) end}] ), + {ok, LocalDocsBtree} = couch_btree:open(Header#db_header.local_docs_btree_state, Fd), + + Db = #db{ + main_pid=self(), + fd=Fd, + header=Header, + summary_stream = SummaryStream, + docinfo_by_Id_btree = IdBtree, + docinfo_by_seq_btree = SeqBtree, + local_docs_btree = LocalDocsBtree, + last_update_seq = Header#db_header.last_update_seq, + doc_count = Header#db_header.doc_count, + doc_del_count = Header#db_header.doc_del_count, + name = DbName + }, + + UpdatePid = spawn_link(couch_db, start_update_loop, [Db]), + + {ok, Db#db{update_pid=UpdatePid}}. + +terminate(_Reason, Db) -> + Db#db.update_pid ! close, + couch_file:close(Db#db.fd). + +handle_call({update_docs, DocActions, Options}, From, #db{update_pid=Updater}=Db) -> + Updater ! {From, update_docs, DocActions, Options}, + {noreply, Db}; +handle_call(get_db, _From, Db) -> + {reply, {ok, Db}, Db}; +handle_call({db_updated, NewDb}, _From, _OldDb) -> + {reply, ok, NewDb}. + + +handle_cast(foo, Main) -> + {noreply, Main}. + +%%% Internal function %%% + +start_update_loop(Db) -> + update_loop(Db#db{update_pid=self()}). + +update_loop(Db) -> + receive + {OrigFrom, update_docs, DocActions, Options} -> + case (catch update_docs_int(Db, DocActions, Options)) of + {ok, Db2} -> + ok = gen_server:call(Db2#db.main_pid, {db_updated, Db2}), + gen_server:reply(OrigFrom, ok), + couch_db_update_notifier:notify({updated, Db2#db.name}), + update_loop(Db2); + conflict -> + gen_server:reply(OrigFrom, conflict), + update_loop(Db); + Error -> + exit(Error) % we crashed + end; + close -> + % terminate loop + exit(normal) + end. + +get_db(MainPid) -> + {ok, Db} = gen_server:call(MainPid, get_db), + Db. + +open_doc_revs_int(Db, Id, Revs, Options) -> + case get_full_doc_info(Db, Id) of + {ok, #full_doc_info{rev_tree=RevTree}} -> + {FoundRevs, MissingRevs} = + case Revs of + all -> + {couch_key_tree:get_all_leafs(RevTree), []}; + _ -> + case lists:member(latest, Options) of + true -> + couch_key_tree:get_key_leafs(RevTree, Revs); + false -> + couch_key_tree:get(RevTree, Revs) + end + end, + FoundResults = + lists:map(fun({Rev, Value, FoundRevPath}) -> + case Value of + 0 -> + % we have the rev in our list but know nothing about it + {{not_found, missing}, Rev}; + {IsDeleted, SummaryPtr} -> + {ok, make_doc(Db, Id, IsDeleted, SummaryPtr, FoundRevPath)} + end + end, FoundRevs), + Results = FoundResults ++ [{{not_found, missing}, MissingRev} || MissingRev <- MissingRevs], + {ok, Results}; + not_found when Revs == all -> + {ok, []}; + not_found -> + {ok, [{{not_found, missing}, Rev} || Rev <- Revs]} + end. + +open_doc_int(Db, ?LOCAL_DOC_PREFIX ++ _ = Id, _Options) -> + case couch_btree:lookup(Db#db.local_docs_btree, [Id]) of + [{ok, {_, {Rev, BodyData}}}] -> + {ok, #doc{id=Id, revs=[integer_to_list(Rev)], body=BodyData}}; + [not_found] -> + {not_found, missing} + end; +open_doc_int(Db, #doc_info{id=Id,rev=Rev,deleted=IsDeleted,summary_pointer=Sp}=DocInfo, Options) -> + Doc = make_doc(Db, Id, IsDeleted, Sp, [Rev]), + {ok, Doc#doc{meta=doc_meta_info(DocInfo, [], Options)}}; +open_doc_int(Db, #full_doc_info{id=Id,rev_tree=RevTree}=FullDocInfo, Options) -> + #doc_info{deleted=IsDeleted,rev=Rev, summary_pointer=Sp} = DocInfo = + couch_doc:to_doc_info(FullDocInfo), + {[{_Rev,_Value, Revs}], []} = couch_key_tree:get(RevTree, [Rev]), + Doc = make_doc(Db, Id, IsDeleted, Sp, Revs), + {ok, Doc#doc{meta=doc_meta_info(DocInfo, RevTree, Options)}}; +open_doc_int(Db, Id, Options) -> + case get_full_doc_info(Db, Id) of + {ok, FullDocInfo} -> + open_doc_int(Db, FullDocInfo, Options); + not_found -> + throw({not_found, missing}) + end. + +doc_meta_info(DocInfo, RevTree, Options) -> + case lists:member(revs_info, Options) of + false -> []; + true -> + {[RevPath],[]} = + couch_key_tree:get_full_key_paths(RevTree, [DocInfo#doc_info.rev]), + [{revs_info, [{Rev, Deleted} || {Rev, {Deleted, _Sp0}} <- RevPath]}] + end ++ + case lists:member(conflicts, Options) of + false -> []; + true -> + case DocInfo#doc_info.conflict_revs of + [] -> []; + _ -> [{conflicts, DocInfo#doc_info.conflict_revs}] + end + end ++ + case lists:member(deleted_conflicts, Options) of + false -> []; + true -> + case DocInfo#doc_info.deleted_conflict_revs of + [] -> []; + _ -> [{deleted_conflicts, DocInfo#doc_info.deleted_conflict_revs}] + end + end. + +% rev tree functions + +doc_to_tree(Doc) -> + doc_to_tree(Doc, lists:reverse(Doc#doc.revs)). + +doc_to_tree(Doc, [RevId]) -> + [{RevId, Doc, []}]; +doc_to_tree(Doc, [RevId | Rest]) -> + [{RevId, [], doc_to_tree(Doc, Rest)}]. + +make_doc(Db, Id, Deleted, SummaryPointer, RevisionPath) -> + {BodyData, BinValues} = + case SummaryPointer of + nil -> + {[], []}; + _ -> + {ok, {BodyData0, BinValues0}} = couch_stream:read_term(Db#db.summary_stream, SummaryPointer), + {BodyData0, [{Name, {Type, {Db#db.fd, Sp, Len}}} || {Name, {Type, Sp, Len}} <- BinValues0]} + end, + #doc{ + id = Id, + revs = RevisionPath, + body = BodyData, + attachments = BinValues, + deleted = Deleted + }. + +flush_trees(_Db, [], AccFlushedTrees) -> + {ok, lists:reverse(AccFlushedTrees)}; +flush_trees(Db, [Unflushed | RestUnflushed], AccFlushed) -> + Flushed = couch_key_tree:map( + fun(_Rev, Value) -> + case Value of + #doc{attachments=Atts,deleted=IsDeleted}=Doc -> + % this node value is actually an unwritten document summary, + % write to disk. + + % convert bins, removing the FD. + % All bins should have been flushed to disk already. + Bins = [{BinName, {BinType, BinSp, BinLen}} || {BinName, {BinType, {_Fd, BinSp, BinLen}}} <- Atts], + {ok, NewSummaryPointer} = couch_stream:write_term(Db#db.summary_stream, {Doc#doc.body, Bins}), + {IsDeleted, NewSummaryPointer}; + _ -> + Value + end + end, Unflushed), + flush_trees(Db, RestUnflushed, [Flushed | AccFlushed]). + +merge_rev_trees(_NoConflicts, [], [], AccNewTrees) -> + {ok, lists:reverse(AccNewTrees)}; +merge_rev_trees(NoConflicts, [NewDocs | RestDocsList], + [OldTree | RestOldTrees], AccNewTrees) -> + UpdatesRevTree = lists:foldl( + fun(NewDoc, AccTree) -> + couch_key_tree:merge(AccTree, doc_to_tree(NewDoc)) + end, + [], NewDocs), + NewRevTree = couch_key_tree:merge(OldTree, UpdatesRevTree), + if NoConflicts andalso OldTree == [] -> + OldConflicts = couch_key_tree:count_leafs(OldTree), + NewConflicts = couch_key_tree:count_leafs(NewRevTree), + if NewConflicts > OldConflicts -> + throw(conflict); + true -> ok + end; + true -> ok + end, + merge_rev_trees(NoConflicts, RestDocsList, RestOldTrees, [NewRevTree | AccNewTrees]). + +new_index_entries([], [], Seq, DocCount, DelCount, AccById, AccBySeq) -> + {ok, Seq, DocCount, DelCount, AccById, AccBySeq}; +new_index_entries([Id|RestIds], [RevTree|RestTrees], Seq0, DocCount, DelCount, AccById, AccBySeq) -> + Seq = Seq0 + 1, + FullDocInfo = #full_doc_info{id=Id, update_seq=Seq, rev_tree=RevTree}, + #doc_info{deleted=Deleted} = DocInfo = couch_doc:to_doc_info(FullDocInfo), + {DocCount2, DelCount2} = + if Deleted -> {DocCount, DelCount + 1}; + true -> {DocCount + 1, DelCount} + end, + new_index_entries(RestIds, RestTrees, Seq, DocCount2, DelCount2, [FullDocInfo|AccById], [DocInfo|AccBySeq]). + +update_docs_int(Db, DocsList, Options) -> + #db{ + docinfo_by_Id_btree = DocInfoByIdBTree, + docinfo_by_seq_btree = DocInfoBySeqBTree, + last_update_seq = LastSeq, + doc_count = FullDocCount, + doc_del_count = FullDelCount + } = Db, + + % separate out the NonRep documents from the rest of the documents + {DocsList2, NonRepDocs} = lists:foldl( + fun([#doc{id=Id}=Doc | Rest]=Docs, {DocsListAcc, NonRepDocsAcc}) -> + case Id of + ?LOCAL_DOC_PREFIX ++ _ when Rest==[] -> + % when saving NR (non rep) documents, you can only save a single rev + {DocsListAcc, [Doc | NonRepDocsAcc]}; + Id-> + {[Docs | DocsListAcc], NonRepDocsAcc} + end + end, {[], []}, DocsList), + + Ids = [Id || [#doc{id=Id}|_] <- DocsList2], + + % lookup up the existing documents, if they exist. + OldDocLookups = couch_btree:lookup(DocInfoByIdBTree, Ids), + OldDocTrees = lists:map( + fun({ok, #full_doc_info{rev_tree=OldRevTree}}) -> + OldRevTree; + (not_found) -> + [] + end, + OldDocLookups), + + {OldCount, OldDelCount} = lists:foldl( + fun({ok, FullDocInfo}, {OldCountAcc, OldDelCountAcc}) -> + case couch_doc:to_doc_info(FullDocInfo) of + #doc_info{deleted=false} -> + {OldCountAcc + 1, OldDelCountAcc}; + _ -> + {OldCountAcc , OldDelCountAcc + 1} + end; + (not_found, Acc) -> + Acc + end, {0, 0}, OldDocLookups), + + % Merge the new docs into the revision trees. + NoConflicts = lists:member(no_conflicts, Options), + {ok, NewRevTrees} = merge_rev_trees(NoConflicts, DocsList2, OldDocTrees, []), + + RemoveSeqs = [ OldSeq || {ok, #full_doc_info{update_seq=OldSeq}} <- OldDocLookups], + + % All regular documents are now ready to write. + + % Try to write the local documents first, a conflict might be generated + {ok, Db2} = update_local_docs(Db, NonRepDocs), + + % Write out the documents summaries (they are stored in the nodes of the rev trees) + {ok, FlushedRevTrees} = flush_trees(Db2, NewRevTrees, []), + + {ok, NewSeq, NewDocsCount, NewDelCount, InfoById, InfoBySeq} = + new_index_entries(Ids, FlushedRevTrees, LastSeq, 0, 0, [], []), + + % and the indexes to the documents + {ok, DocInfoBySeqBTree2} = couch_btree:add_remove(DocInfoBySeqBTree, InfoBySeq, RemoveSeqs), + {ok, DocInfoByIdBTree2} = couch_btree:add_remove(DocInfoByIdBTree, InfoById, []), + + Db3 = Db2#db{ + docinfo_by_Id_btree = DocInfoByIdBTree2, + docinfo_by_seq_btree = DocInfoBySeqBTree2, + last_update_seq = NewSeq, + doc_count = FullDocCount + NewDocsCount - OldCount, + doc_del_count = FullDelCount + NewDelCount - OldDelCount + }, + + case lists:member(delay_commit, Options) of + true -> + {ok, Db3}; + false -> + commit_outstanding(Db3) + end. + +update_local_docs(#db{local_docs_btree=Btree}=Db, Docs) -> + Ids = [Id || #doc{id=Id} <- Docs], + OldDocLookups = couch_btree:lookup(Btree, Ids), + BtreeEntries = lists:zipwith( + fun(#doc{id=Id,deleted=Delete,revs=Revs,body=Body}, OldDocLookup) -> + BasedOnRev = + case Revs of + [] -> 0; + [RevStr|_] -> list_to_integer(RevStr) - 1 + end, + OldRev = + case OldDocLookup of + {ok, {_, {OldRev0, _}}} -> OldRev0; + not_found -> 0 + end, + case OldRev == BasedOnRev of + true -> + case Delete of + false -> {update, {Id, {OldRev+1, Body}}}; + true -> {remove, Id} + end; + false -> + throw(conflict) + end + + end, Docs, OldDocLookups), + + BtreeIdsRemove = [Id || {remove, Id} <- BtreeEntries], + BtreeIdsUpdate = [ByIdDocInfo || {update, ByIdDocInfo} <- BtreeEntries], + + {ok, Btree2} = + couch_btree:add_remove(Btree, BtreeIdsUpdate, BtreeIdsRemove), + + {ok, Db#db{local_docs_btree = Btree2}}. + + + +commit_outstanding(#db{fd=Fd, header=Header} = Db) -> + ok = couch_file:sync(Fd), % commit outstanding data + Header2 = Header#db_header{ + last_update_seq = Db#db.last_update_seq, + summary_stream_state = couch_stream:get_state(Db#db.summary_stream), + docinfo_by_seq_btree_state = couch_btree:get_state(Db#db.docinfo_by_seq_btree), + docinfo_by_Id_btree_state = couch_btree:get_state(Db#db.docinfo_by_Id_btree), + local_docs_btree_state = couch_btree:get_state(Db#db.local_docs_btree), + doc_count = Db#db.doc_count, + doc_del_count = Db#db.doc_del_count + }, + ok = couch_file:write_header(Fd, <<$g, $m, $k, 0>>, Header2), + ok = couch_file:sync(Fd), % commit header to disk + Db2 = Db#db{ + header = Header2 + }, + {ok, Db2}. + + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +handle_info(_Info, State) -> + {noreply, State}. + + diff --git a/src/couchdb/couch_db.hrl b/src/couchdb/couch_db.hrl new file mode 100644 index 00000000..51ee7af2 --- /dev/null +++ b/src/couchdb/couch_db.hrl @@ -0,0 +1,56 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-define(LOCAL_DOC_PREFIX, "_local/"). +-define(DESIGN_DOC_PREFIX0, "_design"). +-define(DESIGN_DOC_PREFIX, "_design/"). + +-define(DEFAULT_ATTACHMENT_CONTENT_TYPE, "application/octet-stream"). + +-record(doc_info, + { + id = "", + rev = "", + update_seq = 0, + summary_pointer = nil, + conflict_revs = [], + deleted_conflict_revs = [], + deleted = false + }). + +-record(full_doc_info, + {id = "", + update_seq = 0, + rev_tree = [] + }). + +-record(doc, + { + id = "", + revs = [], % in the form [{RevId, IsAvailable}, ...] + + % the json body object. + body = {obj, []}, + + % each attachment contains: + % {data, Type, <<binary>>} + % or: + % {pointer, Type, {FileHandle, StreamPointer, Length}} + attachments = [], + + deleted = false, + + % key/value tuple of meta information, provided when using special options: + % couch_db:open_doc(Db, Id, Options). + meta = [] + }). + diff --git a/src/couchdb/couch_db_update_notifier.erl b/src/couchdb/couch_db_update_notifier.erl new file mode 100644 index 00000000..96354620 --- /dev/null +++ b/src/couchdb/couch_db_update_notifier.erl @@ -0,0 +1,66 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +% +% This causes an OS process to spawned and it is notified every time a database +% is updated. +% +% The notifications are in the form of a the database name sent as a line of +% text to the OS processes stdout. +% + +-module(couch_db_update_notifier). + +-behaviour(gen_event). + +-export([start_link/1, notify/1]). +-export([init/1, terminate/2, handle_event/2, handle_call/2, handle_info/2, code_change/3,stop/1]). + +-define(ERR_HANDLE, {Port, {exit_status, Status}} -> {stop, {unknown_error, Status}, {unknown_error, Status}, Port}). + +start_link(Exec) -> + couch_event_sup:start_link(couch_db_update, {couch_db_update_notifier, make_ref()}, Exec). + +notify(Event) -> + gen_event:notify(couch_db_update, Event). + +stop(Pid) -> + couch_event_sup:stop(Pid). + +init(Exec) when is_list(Exec) -> % an exe + Port = open_port({spawn, Exec}, [stream, exit_status, hide]), + {ok, Port}; +init(Else) -> + {ok, Else}. + +terminate(_Reason, _Port) -> + ok. + +handle_event(Event, Fun) when is_function(Fun, 1) -> + Fun(Event), + {ok, Fun}; +handle_event(Event, {Fun, FunAcc}) -> + FunAcc2 = Fun(Event, FunAcc), + {ok, {Fun, FunAcc2}}; +handle_event({EventAtom, DbName}, Port) -> + Obj = {obj, [{type, atom_to_list(EventAtom)}, {db, DbName}]}, + true = port_command(Port, cjson:encode(Obj) ++ "\n"), + {ok, Port}. + +handle_call(_Request, State) -> + {ok, ok, State}. + +handle_info({'EXIT', _, _Reason}, _Port) -> + remove_handler. + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. diff --git a/src/couchdb/couch_doc.erl b/src/couchdb/couch_doc.erl new file mode 100644 index 00000000..a9ef55f7 --- /dev/null +++ b/src/couchdb/couch_doc.erl @@ -0,0 +1,199 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-module(couch_doc). + +-export([get_view_functions/1, is_special_doc/1,to_doc_info/1]). +-export([bin_foldl/3,bin_size/1,bin_to_binary/1]). +-export([from_json_obj/1,to_json_obj/2,has_stubs/1, merge_stubs/2]). + +-include("couch_db.hrl"). + +to_json_obj(#doc{id=Id,deleted=Del,body=Body,revs=Revs,meta=Meta}=Doc,Options)-> + {obj, [{"_id", Id}] ++ + case Revs of + [] -> []; + _ -> [{"_rev", lists:nth(1, Revs)}] + end ++ + case Del of + false -> + {obj, BodyProps} = Body, + BodyProps; + true -> + [{"_deleted", true}] + end ++ + case lists:member(revs, Options) of + false -> []; + true -> + [{"_revs", list_to_tuple(Revs)}] + end ++ + lists:map( + fun({revs_info, RevsInfo}) -> + JsonRevsInfo = + [{obj, [{rev, Rev}, {status, atom_to_list(Status)}]} || + {Rev, Status} <- RevsInfo], + {"_revs_info", list_to_tuple(JsonRevsInfo)}; + ({conflicts, Conflicts}) -> + {"_conflicts", list_to_tuple(Conflicts)}; + ({deleted_conflicts, Conflicts}) -> + {"_deleted_conflicts", list_to_tuple(Conflicts)} + end, Meta) ++ + case lists:member(attachments, Options) of + true -> % return the full rev list and the binaries as strings. + BinProps = lists:map( + fun({Name, {Type, BinValue}}) -> + {Name, {obj, [{"content-type", Type}, + {"data", couch_util:encodeBase64(bin_to_binary(BinValue))}]}} + end, + Doc#doc.attachments), + case BinProps of + [] -> []; + _ -> [{"_attachments", {obj, BinProps}}] + end; + false -> + BinProps = lists:map( + fun({Name, {Type, BinValue}}) -> + {Name, {obj, [{"stub", true}, {"content-type", Type}, + {"length", bin_size(BinValue)}]}} + end, + Doc#doc.attachments), + case BinProps of + [] -> []; + _ -> [{"_attachments", {obj, BinProps}}] + end + end + }. + +from_json_obj({obj, Props}) -> + {obj,JsonBins} = proplists:get_value("_attachments", Props, {obj, []}), + Bins = lists:flatmap(fun({Name, {obj, BinProps}}) -> + case proplists:get_value("stub", BinProps) of + true -> + [{Name, stub}]; + _ -> + Value = proplists:get_value("data", BinProps), + Type = proplists:get_value("content-type", BinProps, + ?DEFAULT_ATTACHMENT_CONTENT_TYPE), + [{Name, {Type, couch_util:decodeBase64(Value)}}] + end + end, JsonBins), + AllowedSpecialMembers = ["id", "revs", "rev", "attachments", "revs_info", + "conflicts", "deleted_conflicts", "deleted"], + [case lists:member(Name, AllowedSpecialMembers) of + true -> + ok; + false -> + throw({doc_validation, io_lib:format("Bad special document member: _~s", [Name])}) + end + || {[$_|Name], _Value} <- Props], + Revs = + case tuple_to_list(proplists:get_value("_revs", Props, {})) of + [] -> + case proplists:get_value("_rev", Props) of + undefined -> []; + Rev -> [Rev] + end; + Revs0 -> + Revs0 + end, + #doc{ + id = proplists:get_value("_id", Props, ""), + revs = Revs, + deleted = proplists:get_value("_deleted", Props, false), + body = {obj, [{Key, Value} || {[FirstChar|_]=Key, Value} <- Props, FirstChar /= $_]}, + attachments = Bins + }. + + +to_doc_info(#full_doc_info{id=Id,update_seq=Seq,rev_tree=Tree}) -> + LeafRevs = couch_key_tree:get_all_leafs(Tree), + SortedLeafRevs = + lists:sort(fun({RevIdA, {IsDeletedA, _}, PathA}, {RevIdB, {IsDeletedB, _}, PathB}) -> + % sort descending by {not deleted, then Depth, then RevisionId} + A = {not IsDeletedA, length(PathA), RevIdA}, + B = {not IsDeletedB, length(PathB), RevIdB}, + A > B + end, + LeafRevs), + + [{RevId, {IsDeleted, SummaryPointer}, _Path} | Rest] = SortedLeafRevs, + + {ConflictRevTuples, DeletedConflictRevTuples} = + lists:splitwith(fun({_ConflictRevId, {IsDeleted1, _SummaryPointer}, _}) -> + not IsDeleted1 + end, Rest), + + ConflictRevs = [RevId1 || {RevId1, _, _} <- ConflictRevTuples], + DeletedConflictRevs = [RevId2 || {RevId2, _, _} <- DeletedConflictRevTuples], + + #doc_info{ + id=Id, + update_seq=Seq, + rev = RevId, + summary_pointer = SummaryPointer, + conflict_revs = ConflictRevs, + deleted_conflict_revs = DeletedConflictRevs, + deleted = IsDeleted + }. + +is_special_doc(?DESIGN_DOC_PREFIX ++ _ ) -> + true; +is_special_doc(#doc{id=Id}) -> + is_special_doc(Id); +is_special_doc(_) -> + false. + +bin_foldl(Bin, Fun, Acc) when is_binary(Bin) -> + case Fun(Bin, Acc) of + {ok, Acc2} -> {ok, Acc2}; + {done, Acc2} -> {ok, Acc2} + end; +bin_foldl({Fd, Sp, Len}, Fun, Acc) -> + {ok, Acc2, _Sp2} = couch_stream:foldl(Fd, Sp, Len, Fun, Acc), + {ok, Acc2}. + +bin_size(Bin) when is_binary(Bin) -> + size(Bin); +bin_size({_Fd, _Sp, Len}) -> + Len. + +bin_to_binary(Bin) when is_binary(Bin) -> + Bin; +bin_to_binary({Fd, Sp, Len}) -> + {ok, Bin, _Sp2} = couch_stream:read(Fd, Sp, Len), + Bin. + +get_view_functions(#doc{body={obj, Fields}}) -> + Lang = proplists:get_value("language", Fields, "text/javascript"), + {obj, Views} = proplists:get_value("views", Fields, {obj, []}), + {Lang, [{ViewName, Value} || {ViewName, Value} <- Views, is_list(Value)]}; +get_view_functions(_Doc) -> + none. + +has_stubs(#doc{attachments=Bins}) -> + has_stubs(Bins); +has_stubs([]) -> + false; +has_stubs([{_Name, stub}|_]) -> + true; +has_stubs([_Bin|Rest]) -> + has_stubs(Rest). + +merge_stubs(#doc{attachments=MemBins}=StubsDoc, #doc{attachments=DiskBins}) -> + BinDict = dict:from_list(DiskBins), + MergedBins = lists:map( + fun({Name, stub}) -> + {Name, dict:fetch(Name, BinDict)}; + ({Name, Value}) -> + {Name, Value} + end, MemBins), + StubsDoc#doc{attachments= MergedBins}. diff --git a/src/couchdb/couch_erl_driver.c b/src/couchdb/couch_erl_driver.c new file mode 100644 index 00000000..b5703f09 --- /dev/null +++ b/src/couchdb/couch_erl_driver.c @@ -0,0 +1,160 @@ +/* + +Licensed under the Apache License, Version 2.0 (the "License"); you may not use +this file except in compliance with the License. You may obtain a copy of the +License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software distributed +under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR +CONDITIONS OF ANY KIND, either express or implied. See the License for the +specific language governing permissions and limitations under the License. + +*/ + +// This file is the C port driver for Erlang. It provides a low overhead +// means of calling into C code, however unlike the Fabric engine, coding +// errors in this module can crash the entire Erlang server. + +#include "erl_driver.h" +#include "unicode/ucol.h" +#include "unicode/ucasemap.h" +#ifndef WIN32 +#include <string.h> // for memcpy +#endif + +typedef struct { + ErlDrvPort port; + UCollator* collNoCase; + UCollator* coll; +} couch_drv_data; + +static void couch_drv_stop(ErlDrvData data) +{ + couch_drv_data* pData = (couch_drv_data*)data; + if (pData->coll) { + ucol_close(pData->coll); + } + if (pData->collNoCase) { + ucol_close(pData->collNoCase); + } + driver_free((char*)pData); +} + +static ErlDrvData couch_drv_start(ErlDrvPort port, char *buff) +{ + UErrorCode status = U_ZERO_ERROR; + couch_drv_data* pData = (couch_drv_data*)driver_alloc(sizeof(couch_drv_data)); + + if (pData == NULL) + return ERL_DRV_ERROR_GENERAL; + + pData->port = port; + pData->coll = NULL; + pData->collNoCase = NULL; + pData->coll = ucol_open("", &status); + + if (U_FAILURE(status)) { + couch_drv_stop((ErlDrvData)pData); + return ERL_DRV_ERROR_GENERAL; + } + + pData->collNoCase = ucol_open("", &status); + if (U_FAILURE(status)) { + couch_drv_stop((ErlDrvData)pData); + return ERL_DRV_ERROR_GENERAL; + } + + ucol_setAttribute(pData->collNoCase, UCOL_STRENGTH, UCOL_PRIMARY, &status); + if (U_FAILURE(status)) { + couch_drv_stop((ErlDrvData)pData); + return ERL_DRV_ERROR_GENERAL; + } + + return (ErlDrvData)pData; +} + +static int return_control_result(void* pLocalResult, int localLen, char **ppRetBuf, int returnLen) +{ + if (*ppRetBuf == NULL || localLen > returnLen) { + *ppRetBuf = (char*)driver_alloc_binary(localLen); + if(*ppRetBuf == NULL) { + return -1; + } + } + memcpy(*ppRetBuf, pLocalResult, localLen); + return localLen; +} + +static int couch_drv_control(ErlDrvData drv_data, unsigned int command, const char *pBuf, + int bufLen, char **rbuf, int rlen) +{ + #define COLLATE 0 + #define COLLATE_NO_CASE 1 + + couch_drv_data* pData = (couch_drv_data*)drv_data; + + UErrorCode status = U_ZERO_ERROR; + int collResult; + char response; + UCharIterator iterA; + UCharIterator iterB; + int32_t length; + + // 2 strings are in the buffer, consecutively + // The strings begin first with a 32 bit integer byte length, then the actual + // string bytes follow. + + // first 32bits are the length + memcpy(&length, pBuf, sizeof(length)); + pBuf += sizeof(length); + + // point the iterator at it. + uiter_setUTF8(&iterA, pBuf, length); + + pBuf += length; // now on to string b + + // first 32bits are the length + memcpy(&length, pBuf, sizeof(length)); + pBuf += sizeof(length); + + // point the iterator at it. + uiter_setUTF8(&iterB, pBuf, length); + + if (command == COLLATE) + collResult = ucol_strcollIter(pData->coll, &iterA, &iterB, &status); + else if (command == COLLATE_NO_CASE) + collResult = ucol_strcollIter(pData->collNoCase, &iterA, &iterB, &status); + else + return -1; + + if (collResult < 0) + response = 0; //lt + else if (collResult > 0) + response = 1; //gt + else + response = 2; //eq + + return return_control_result(&response, sizeof(response), rbuf, rlen); +} + +ErlDrvEntry couch_driver_entry = { + NULL, /* F_PTR init, N/A */ + couch_drv_start, /* L_PTR start, called when port is opened */ + couch_drv_stop, /* F_PTR stop, called when port is closed */ + NULL, /* F_PTR output, called when erlang has sent */ + NULL, /* F_PTR ready_input, called when input descriptor ready */ + NULL, /* F_PTR ready_output, called when output descriptor ready */ + "couch_erl_driver", /* char *driver_name, the argument to open_port */ + NULL, /* F_PTR finish, called when unloaded */ + NULL, /* Not used */ + couch_drv_control, /* F_PTR control, port_command callback */ + NULL, /* F_PTR timeout, reserved */ + NULL /* F_PTR outputv, reserved */ +}; + +DRIVER_INIT(couch_erl_driver) /* must match name in driver_entry */ +{ + return &couch_driver_entry; +} diff --git a/src/couchdb/couch_event_sup.erl b/src/couchdb/couch_event_sup.erl new file mode 100644 index 00000000..72b17e5d --- /dev/null +++ b/src/couchdb/couch_event_sup.erl @@ -0,0 +1,69 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +%% The purpose of this module is to allow event handlers to particpate in Erlang +%% supervisor trees. It provide a monitorable process that crashes if the event +%% handler fails. The process, when shutdown, deregisters the event handler. + +-module(couch_event_sup). +-behaviour(gen_server). + +-include("couch_db.hrl"). + +-export([start_link/3,start_link/4, stop/1]). +-export([init/1, terminate/2, handle_call/3, handle_cast/2, handle_info/2,code_change/3]). + +% +% Instead calling the +% ok = gen_event:add_sup_handler(error_logger, my_log, Args) +% +% do this: +% {ok, LinkedPid} = couch_event_sup:start_link(error_logger, my_log, Args) +% +% The benefit is the event is now part of the process tree, and can be +% started, restarted and shutdown consistently like the rest of the server +% components. +% +% And now if the "event" crashes, the supervisor is notified and can restart +% the event handler. +% +% Use this form to named process: +% {ok, LinkedPid} = couch_event_sup:start_link({local, my_log}, error_logger, my_log, Args) +% + +start_link(EventMgr, EventHandler, Args) -> + gen_server:start_link(couch_event_sup, {EventMgr, EventHandler, Args}, []). + +start_link(ServerName, EventMgr, EventHandler, Args) -> + gen_server:start_link(ServerName, couch_event_sup, {EventMgr, EventHandler, Args}, []). + +stop(Pid) -> + gen_server:cast(Pid, stop). + +init({EventMgr, EventHandler, Args}) -> + ok = gen_event:add_sup_handler(EventMgr, EventHandler, Args), + {ok, {EventMgr, EventHandler}}. + +terminate(_Reason, _State) -> + ok. + +handle_call(_Whatever, _From, State) -> + {ok, State}. + +handle_cast(stop, State) -> + {stop, normal, State}. + +handle_info({gen_event_EXIT, _Handler, Reason}, State) -> + {stop, Reason, State}. + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. diff --git a/src/couchdb/couch_file.erl b/src/couchdb/couch_file.erl new file mode 100644 index 00000000..6cbad44a --- /dev/null +++ b/src/couchdb/couch_file.erl @@ -0,0 +1,323 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-module(couch_file). +-behaviour(gen_server). + +-define(HEADER_SIZE, 2048). % size of each segment of the doubly written header + +-export([open/1, open/2, close/1, pread/3, pwrite/3, expand/2, bytes/1, sync/1]). +-export([append_term/2, pread_term/2,write_header/3, read_header/2, truncate/2]). +-export([init/1, terminate/2, handle_call/3, handle_cast/2, code_change/3, handle_info/2]). + +%%---------------------------------------------------------------------- +%% Args: Valid Options are [create] and [create,overwrite]. +%% Files are opened in read/write mode. +%% Returns: On success, {ok, Fd} +%% or {error, Reason} if the file could not be opened. +%%---------------------------------------------------------------------- + +open(Filepath) -> + open(Filepath, []). + +open(Filepath, Options) -> + case gen_server:start_link(couch_file, {Filepath, Options, self()}, []) of + {ok, FdPid} -> + % we got back an ok, but that doesn't really mean it was successful. + % Instead the true status has been sent back to us as a message. + % We do this because if the gen_server doesn't initialize properly, + % it generates a crash report that will get logged. This avoids + % that mess, because we don't want crash reports generated + % every time a file cannot be found. + receive + {FdPid, ok} -> + {ok, FdPid}; + {FdPid, Error} -> + Error + end; + Error -> + Error + end. + + +%%---------------------------------------------------------------------- +%% Args: Pos is the offset from the beginning of the file, Bytes is +%% is the number of bytes to read. +%% Returns: {ok, Binary} where Binary is a binary data from disk +%% or {error, Reason}. +%%---------------------------------------------------------------------- + +pread(Fd, Pos, Bytes) when Bytes > 0 -> + gen_server:call(Fd, {pread, Pos, Bytes}). + + +%%---------------------------------------------------------------------- +%% Args: Pos is the offset from the beginning of the file, Bin is +%% is the binary to write +%% Returns: ok +%% or {error, Reason}. +%%---------------------------------------------------------------------- + +pwrite(Fd, Pos, Bin) -> + gen_server:call(Fd, {pwrite, Pos, Bin}). + +%%---------------------------------------------------------------------- +%% Purpose: To append a segment of zeros to the end of the file. +%% Args: Bytes is the number of bytes to append to the file. +%% Returns: {ok, Pos} where Pos is the file offset to the beginning of +%% the new segments. +%% or {error, Reason}. +%%---------------------------------------------------------------------- + +expand(Fd, Bytes) when Bytes > 0 -> + gen_server:call(Fd, {expand, Bytes}). + + +%%---------------------------------------------------------------------- +%% Purpose: To append an Erlang term to the end of the file. +%% Args: Erlang term to serialize and append to the file. +%% Returns: {ok, Pos} where Pos is the file offset to the beginning the +%% serialized term. Use pread_term to read the term back. +%% or {error, Reason}. +%%---------------------------------------------------------------------- + +append_term(Fd, Term) -> + gen_server:call(Fd, {append_term, Term}). + + +%%---------------------------------------------------------------------- +%% Purpose: Reads a term from a file that was written with append_term +%% Args: Pos, the offset into the file where the term is serialized. +%% Returns: {ok, Term} +%% or {error, Reason}. +%%---------------------------------------------------------------------- + +pread_term(Fd, Pos) -> + gen_server:call(Fd, {pread_term, Pos}). + + +%%---------------------------------------------------------------------- +%% Purpose: The length of a file, in bytes. +%% Returns: {ok, Bytes} +%% or {error, Reason}. +%%---------------------------------------------------------------------- + +% length in bytes +bytes(Fd) -> + gen_server:call(Fd, bytes). + +%%---------------------------------------------------------------------- +%% Purpose: Truncate a file to the number of bytes. +%% Returns: ok +%% or {error, Reason}. +%%---------------------------------------------------------------------- + +truncate(Fd, Pos) -> + gen_server:call(Fd, {truncate, Pos}). + +%%---------------------------------------------------------------------- +%% Purpose: Ensure all bytes written to the file are flushed to disk. +%% Returns: ok +%% or {error, Reason}. +%%---------------------------------------------------------------------- + +sync(Fd) -> + gen_server:call(Fd, sync). + +%%---------------------------------------------------------------------- +%% Purpose: Close the file. Is performed asynchronously. +%% Returns: ok +%%---------------------------------------------------------------------- +close(Fd) -> + gen_server:cast(Fd, close). + + +write_header(Fd, Prefix, Data) -> + % The leading bytes in every db file, the sig and the file version: + %the actual header data + TermBin = term_to_binary(Data), + % the size of all the bytes written to the header, including the md5 signature (16 bytes) + FilledSize = size(Prefix) + size(TermBin) + 16, + case FilledSize > ?HEADER_SIZE of + true -> + % too big! + {error, error_header_too_large}; + false -> + % pad out the header with zeros, then take the md5 hash + PadZeros = <<0:(8*(?HEADER_SIZE - FilledSize))>>, + Sig = erlang:md5([TermBin, PadZeros]), + % now we assemble the final header binary and write to disk + WriteBin = <<Prefix/binary, TermBin/binary, PadZeros/binary, Sig/binary>>, + ?HEADER_SIZE = size(WriteBin), % sanity check + DblWriteBin = [WriteBin, WriteBin], + ok = pwrite(Fd, 0, DblWriteBin) + end. + + +read_header(Fd, Prefix) -> + {ok, Bin} = couch_file:pread(Fd, 0, 2*(?HEADER_SIZE)), + <<Bin1:(?HEADER_SIZE)/binary, Bin2:(?HEADER_SIZE)/binary>> = Bin, + % read the first header + case extract_header(Prefix, Bin1) of + {ok, Header1} -> + case extract_header(Prefix, Bin2) of + {ok, Header2} -> + case Header1 == Header2 of + true -> + % Everything is completely normal! + {ok, Header1}; + false -> + % To get here we must have two different header versions with signatures intact. + % It's weird but possible (a commit failure right at the 2k boundary). Log it and take the first. + couch_log:info("Header version differences.~nPrimary Header: ~p~nSecondary Header: ~p", [Header1, Header2]), + {ok, Header1} + end; + {error, Error} -> + % error reading second header. It's ok, but log it. + couch_log:info("Secondary header corruption (error: ~p). Using primary header.", [Error]), + {ok, Header1} + end; + {error, Error} -> + % error reading primary header + case extract_header(Prefix, Bin2) of + {ok, Header2} -> + % log corrupt primary header. It's ok since the secondary is still good. + couch_log:info("Primary header corruption (error: ~p). Using secondary header.", [Error]), + {ok, Header2}; + _ -> + % error reading secondary header too + % return the error, no need to log anything as the caller will be responsible for dealing with the error. + {error, Error} + end + end. + + +extract_header(Prefix, Bin) -> + SizeOfPrefix = size(Prefix), + SizeOfTermBin = ?HEADER_SIZE - + SizeOfPrefix - + 16, % md5 sig + + <<HeaderPrefix:SizeOfPrefix/binary, TermBin:SizeOfTermBin/binary, Sig:16/binary>> = Bin, + + % check the header prefix + case HeaderPrefix of + Prefix -> + % check the integrity signature + case erlang:md5(TermBin) == Sig of + true -> + Header = binary_to_term(TermBin), + {ok, Header}; + false -> + {error, header_corrupt} + end; + _ -> + {error, unknown_header_type} + end. + + + +init_status_ok(ReturnPid, Fd) -> + ReturnPid ! {self(), ok}, % signal back ok + {ok, Fd}. + +init_status_error(ReturnPid, Error) -> + ReturnPid ! {self(), Error}, % signal back error status + self() ! self_close, % tell ourself to close async + {ok, nil}. + +% server functions + +init({Filepath, Options, ReturnPid}) -> + case lists:member(create, Options) of + true -> + filelib:ensure_dir(Filepath), + case file:open(Filepath, [read, write, raw, binary]) of + {ok, Fd} -> + {ok, Length} = file:position(Fd, eof), + case Length > 0 of + true -> + % this means the file already exists and has data. + % FYI: We don't differentiate between empty files and non-existant + % files here. + case lists:member(overwrite, Options) of + true -> + {ok, 0} = file:position(Fd, 0), + ok = file:truncate(Fd), + init_status_ok(ReturnPid, Fd); + false -> + ok = file:close(Fd), + init_status_error(ReturnPid, {error, file_exists}) + end; + false -> + init_status_ok(ReturnPid, Fd) + end; + Error -> + init_status_error(ReturnPid, Error) + end; + false -> + % open in read mode first, so we don't create the file if it doesn't exist. + case file:open(Filepath, [read, raw]) of + {ok, Fd_Read} -> + {ok, Fd} = file:open(Filepath, [read, write, raw, binary]), + ok = file:close(Fd_Read), + init_status_ok(ReturnPid, Fd); + Error -> + init_status_error(ReturnPid, Error) + end + end. + + +terminate(_Reason, nil) -> + ok; +terminate(_Reason, Fd) -> + file:close(Fd), + ok. + + +handle_call({pread, Pos, Bytes}, _From, Fd) -> + {reply, file:pread(Fd, Pos, Bytes), Fd}; +handle_call({pwrite, Pos, Bin}, _From, Fd) -> + {reply, file:pwrite(Fd, Pos, Bin), Fd}; +handle_call({expand, Num}, _From, Fd) -> + {ok, Pos} = file:position(Fd, eof), + {reply, {file:pwrite(Fd, Pos + Num - 1, <<0>>), Pos}, Fd}; +handle_call(bytes, _From, Fd) -> + {reply, file:position(Fd, eof), Fd}; +handle_call(sync, _From, Fd) -> + {reply, file:sync(Fd), Fd}; +handle_call({truncate, Pos}, _From, Fd) -> + {ok, Pos} = file:position(Fd, Pos), + {reply, file:truncate(Fd), Fd}; +handle_call({append_term, Term}, _From, Fd) -> + Bin = term_to_binary(Term, [compressed]), + TermLen = size(Bin), + Bin2 = <<TermLen:32, Bin/binary>>, + {ok, Pos} = file:position(Fd, eof), + {reply, {file:pwrite(Fd, Pos, Bin2), Pos}, Fd}; +handle_call({pread_term, Pos}, _From, Fd) -> + {ok, <<TermLen:32>>} + = file:pread(Fd, Pos, 4), + {ok, Bin} = file:pread(Fd, Pos + 4, TermLen), + {reply, {ok, binary_to_term(Bin)}, Fd}. + + +handle_cast(close, Fd) -> + {stop,normal,Fd}. % causes terminate to be called + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +handle_info(self_close, State) -> + {stop,normal,State}; +handle_info(_Info, State) -> + {noreply, State}. diff --git a/src/couchdb/couch_ft_query.erl b/src/couchdb/couch_ft_query.erl new file mode 100644 index 00000000..2d1b9fc5 --- /dev/null +++ b/src/couchdb/couch_ft_query.erl @@ -0,0 +1,78 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-module(couch_ft_query). +-behaviour(gen_server). + +-export([start_link/1, execute/2]). + +-export([init/1, terminate/2, handle_call/3, handle_cast/2, handle_info/2,code_change/3, stop/0]). + +-define(ERR_HANDLE, {Port, {exit_status, Status}} -> {stop, {unknown_error, Status}, {unknown_error, Status}, Port}). + +start_link(QueryExec) -> + gen_server:start_link({local, couch_ft_query}, couch_ft_query, QueryExec, []). + +stop() -> + exit(whereis(couch_ft_query), close). + +execute(DatabaseName, QueryString) -> + gen_server:call(couch_ft_query, {ft_query, DatabaseName, QueryString}). + +init(QueryExec) -> + Port = open_port({spawn, QueryExec}, [{line, 1000}, exit_status, hide]), + {ok, Port}. + +terminate(_Reason, _Server) -> + ok. + +handle_call({ft_query, Database, QueryText}, _From, Port) -> + %% send the database name + true = port_command(Port, Database ++ "\n"), + true = port_command(Port, QueryText ++ "\n"), + case get_line(Port) of + "ok" -> + DocIds = read_query_results(Port, []), + {reply, {ok, DocIds}, Port}; + "error" -> + ErrorId = get_line(Port), + ErrorMsg = get_line(Port), + {reply, {list_to_atom(ErrorId), ErrorMsg}, Port} + end. + +read_query_results(Port, Acc) -> + case get_line(Port) of + "" -> % line by itself means all done + lists:reverse(Acc); + DocId -> + Score = get_line(Port), + read_query_results(Port, [{DocId, Score} | Acc]) + end. + + +get_line(Port) -> + receive + {Port, {data, {eol, Line}}} -> + Line; + ?ERR_HANDLE + end. + +handle_cast(_Whatever, State) -> + {noreply, State}. + +handle_info({Port, {exit_status, Status}}, Port) -> + {stop, {os_process_exited, Status}, Port}; +handle_info(_Whatever, State) -> + {noreply, State}. + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. diff --git a/src/couchdb/couch_js.c b/src/couchdb/couch_js.c new file mode 100644 index 00000000..a234fda9 --- /dev/null +++ b/src/couchdb/couch_js.c @@ -0,0 +1,452 @@ +/* + +Licensed under the Apache License, Version 2.0 (the "License"); you may not use +this file except in compliance with the License. You may obtain a copy of the +License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software distributed +under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR +CONDITIONS OF ANY KIND, either express or implied. See the License for the +specific language governing permissions and limitations under the License. + +*/ + +#include <stdio.h> +#include <jsapi.h> + +int gExitCode = 0; +int gStackChunkSize = 8L * 1024L; + +int +EncodeChar(uint8 *utf8Buffer, uint32 ucs4Char) { + int utf8Length = 1; + + if (ucs4Char < 0x80) { + *utf8Buffer = (uint8)ucs4Char; + } else { + int i; + uint32 a = ucs4Char >> 11; + utf8Length = 2; + while (a) { + a >>= 5; + utf8Length++; + } + i = utf8Length; + while (--i) { + utf8Buffer[i] = (uint8)((ucs4Char & 0x3F) | 0x80); + ucs4Char >>= 6; + } + *utf8Buffer = (uint8)(0x100 - (1 << (8-utf8Length)) + ucs4Char); + } + return utf8Length; +} + +JSBool +EncodeString(const jschar *src, size_t srclen, char *dst, size_t *dstlenp) { + size_t i, utf8Len, dstlen = *dstlenp, origDstlen = dstlen; + jschar c, c2; + uint32 v; + uint8 utf8buf[6]; + + if (!dst) + dstlen = origDstlen = (size_t) -1; + + while (srclen) { + c = *src++; + srclen--; + if ((c >= 0xDC00) && (c <= 0xDFFF)) + goto badSurrogate; + if (c < 0xD800 || c > 0xDBFF) { + v = c; + } else { + if (srclen < 1) + goto bufferTooSmall; + c2 = *src++; + srclen--; + if ((c2 < 0xDC00) || (c2 > 0xDFFF)) { + c = c2; + goto badSurrogate; + } + v = ((c - 0xD800) << 10) + (c2 - 0xDC00) + 0x10000; + } + if (v < 0x0080) { + /* no encoding necessary - performance hack */ + if (!dstlen) + goto bufferTooSmall; + if (dst) + *dst++ = (char) v; + utf8Len = 1; + } else { + utf8Len = EncodeChar(utf8buf, v); + if (utf8Len > dstlen) + goto bufferTooSmall; + if (dst) { + for (i = 0; i < utf8Len; i++) + *dst++ = (char) utf8buf[i]; + } + } + dstlen -= utf8Len; + } + *dstlenp = (origDstlen - dstlen); + return JS_TRUE; + +badSurrogate: + *dstlenp = (origDstlen - dstlen); + return JS_FALSE; + +bufferTooSmall: + *dstlenp = (origDstlen - dstlen); + return JS_FALSE; +} + +static uint32 +DecodeChar(const uint8 *utf8Buffer, int utf8Length) { + uint32 ucs4Char; + uint32 minucs4Char; + /* from Unicode 3.1, non-shortest form is illegal */ + static const uint32 minucs4Table[] = { + 0x00000080, 0x00000800, 0x0001000, 0x0020000, 0x0400000 + }; + + if (utf8Length == 1) { + ucs4Char = *utf8Buffer; + } else { + ucs4Char = *utf8Buffer++ & ((1<<(7-utf8Length))-1); + minucs4Char = minucs4Table[utf8Length-2]; + while (--utf8Length) { + ucs4Char = ucs4Char<<6 | (*utf8Buffer++ & 0x3F); + } + if (ucs4Char < minucs4Char || + ucs4Char == 0xFFFE || ucs4Char == 0xFFFF) { + ucs4Char = 0xFFFD; + } + } + return ucs4Char; +} + +JSBool +DecodeString(const char *src, size_t srclen, jschar *dst, size_t *dstlenp) { + uint32 v; + size_t offset = 0, j, n, dstlen = *dstlenp, origDstlen = dstlen; + + if (!dst) + dstlen = origDstlen = (size_t) -1; + + while (srclen) { + v = (uint8) *src; + n = 1; + if (v & 0x80) { + while (v & (0x80 >> n)) + n++; + if (n > srclen) + goto bufferTooSmall; + if (n == 1 || n > 6) + goto badCharacter; + for (j = 1; j < n; j++) { + if ((src[j] & 0xC0) != 0x80) + goto badCharacter; + } + v = DecodeChar((const uint8 *) src, n); + if (v >= 0x10000) { + v -= 0x10000; + if (v > 0xFFFFF || dstlen < 2) { + *dstlenp = (origDstlen - dstlen); + return JS_FALSE; + } + if (dstlen < 2) + goto bufferTooSmall; + if (dst) { + *dst++ = (jschar)((v >> 10) + 0xD800); + v = (jschar)((v & 0x3FF) + 0xDC00); + } + dstlen--; + } + } + if (!dstlen) + goto bufferTooSmall; + if (dst) + *dst++ = (jschar) v; + dstlen--; + offset += n; + src += n; + srclen -= n; + } + *dstlenp = (origDstlen - dstlen); + return JS_TRUE; + +badCharacter: + *dstlenp = (origDstlen - dstlen); + return JS_FALSE; + +bufferTooSmall: + *dstlenp = (origDstlen - dstlen); + return JS_FALSE; +} + +static JSBool +EvalInContext(JSContext *context, JSObject *obj, uintN argc, jsval *argv, + jsval *rval) { + JSString *str; + JSObject *sandbox; + JSContext *sub_context; + const jschar *src; + size_t srclen; + JSBool ok; + jsval v; + + sandbox = NULL; + if (!JS_ConvertArguments(context, argc, argv, "S / o", &str, &sandbox)) + return JS_FALSE; + + sub_context = JS_NewContext(JS_GetRuntime(context), gStackChunkSize); + if (!sub_context) { + JS_ReportOutOfMemory(context); + return JS_FALSE; + } + + src = JS_GetStringChars(str); + srclen = JS_GetStringLength(str); + + if (!sandbox) { + sandbox = JS_NewObject(sub_context, NULL, NULL, NULL); + if (!sandbox || !JS_InitStandardClasses(sub_context, sandbox)) { + ok = JS_FALSE; + goto out; + } + } + + if (srclen == 0) { + *rval = OBJECT_TO_JSVAL(sandbox); + ok = JS_TRUE; + } else { + ok = JS_EvaluateUCScript(sub_context, sandbox, src, srclen, NULL, -1, + rval); + } + +out: + JS_DestroyContext(sub_context); + return ok; +} + +static JSBool +GC(JSContext *context, JSObject *obj, uintN argc, jsval *argv, jsval *rval) { + JS_GC(context); + return JS_TRUE; +} + +static JSBool +Print(JSContext *context, JSObject *obj, uintN argc, jsval *argv, jsval *rval) { + uintN i, n; + size_t cl, bl; + JSString *str; + jschar *chars; + char *bytes; + + for (i = n = 0; i < argc; i++) { + str = JS_ValueToString(context, argv[i]); + if (!str) + return JS_FALSE; + chars = JS_GetStringChars(str); + cl = JS_GetStringLength(str); + if (!EncodeString(chars, cl, NULL, &bl)) + return JS_FALSE; + bytes = JS_malloc(context, bl + 1); + bytes[bl] = '\0'; + if (!EncodeString(chars, cl, bytes, &bl)) { + JS_free(context, bytes); + return JS_FALSE; + } + fprintf(stdout, "%s%s", i ? " " : "", bytes); + JS_free(context, bytes); + } + n++; + if (n) + fputc('\n', stdout); + fflush(stdout); + return JS_TRUE; +} + +static JSBool +Quit(JSContext *context, JSObject *obj, uintN argc, jsval *argv, jsval *rval) { + JS_ConvertArguments(context, argc, argv, "/ i", &gExitCode); + return JS_FALSE; +} + +static JSBool +ReadLine(JSContext *context, JSObject *obj, uintN argc, jsval *argv, jsval *rval) { + char *bytes, *tmp; + jschar *chars; + size_t bufsize, byteslen, charslen, readlen; + JSString *str; + + JS_MaybeGC(context); + + byteslen = 0; + bufsize = 256; + bytes = JS_malloc(context, bufsize); + if (!bytes) + return JS_FALSE; + + while ((readlen = js_fgets(bytes + byteslen, bufsize - byteslen, stdin)) > 0) { + byteslen += readlen; + + /* Are we done? */ + if (bytes[byteslen - 1] == '\n') { + bytes[byteslen - 1] = '\0'; + break; + } + + /* Else, grow our buffer for another pass */ + tmp = JS_realloc(context, bytes, bufsize * 2); + if (!tmp) { + JS_free(context, bytes); + return JS_FALSE; + } + + bufsize *= 2; + bytes = tmp; + } + + /* Treat the empty string specially */ + if (byteslen == 0) { + *rval = JS_GetEmptyStringValue(context); + JS_free(context, bytes); + return JS_TRUE; + } + + /* Shrink the buffer to the real size */ + tmp = JS_realloc(context, bytes, byteslen); + if (!tmp) { + JS_free(context, bytes); + return JS_FALSE; + } + bytes = tmp; + + /* Decode the string from UTF-8 */ + if (!DecodeString(bytes, byteslen, NULL, &charslen)) { + JS_free(context, bytes); + return JS_FALSE; + } + chars = JS_malloc(context, (charslen + 1) * sizeof(jschar)); + if (!DecodeString(bytes, byteslen, chars, &charslen)) { + JS_free(context, bytes); + JS_free(context, chars); + return JS_FALSE; + } + chars[charslen] = '\0'; + + /* Initialize a JSString object */ + str = JS_NewUCString(context, chars, charslen - 1); + if (!str) { + JS_free(context, bytes); + JS_free(context, chars); + return JS_FALSE; + } + + *rval = STRING_TO_JSVAL(str); + return JS_TRUE; +} + +static JSBool +Seal(JSContext *context, JSObject *obj, uintN argc, jsval *argv, jsval *rval) { + JSObject *target; + JSBool deep = JS_FALSE; + + if (!JS_ConvertArguments(context, argc, argv, "o/b", &target, &deep)) + return JS_FALSE; + if (!target) + return JS_TRUE; + return JS_SealObject(context, target, deep); +} + +static void +ExecuteScript(JSContext *context, JSObject *obj, const char *filename) { + FILE *file; + JSScript *script; + jsval result; + + if (!filename || strcmp(filename, "-") == 0) { + file = stdin; + } else { + file = fopen(filename, "r"); + if (!file) { + fprintf(stderr, "could not open script file %s\n", filename); + gExitCode = 1; + return; + } + } + + script = JS_CompileFileHandle(context, obj, filename, file); + if (script) { + JS_ExecuteScript(context, obj, script, &result); + JS_DestroyScript(context, script); + } +} + +static uint32 gBranchCount = 0; +static uint32 gBranchLimit = 100 * 1024; + +static JSBool +BranchCallback(JSContext *context, JSScript *script) { + if (++gBranchCount == gBranchLimit) { + gBranchCount = 0; + return JS_FALSE; + } + if ((gBranchCount & 0x3fff) == 1) { + JS_MaybeGC(context); + } + return JS_TRUE; +} + +static void +PrintError(JSContext *context, const char *message, JSErrorReport *report) { + if (!report || !JSREPORT_IS_WARNING(report->flags)) + fprintf(stderr, "%s\n", message); +} + +int +main(int argc, const char * argv[]) { + JSRuntime *runtime; + JSContext *context; + JSObject *global; + + runtime = JS_NewRuntime(64L * 1024L * 1024L); + if (!runtime) + return 1; + context = JS_NewContext(runtime, gStackChunkSize); + if (!context) + return 1; + JS_SetErrorReporter(context, PrintError); + JS_SetBranchCallback(context, BranchCallback); + JS_ToggleOptions(context, JSOPTION_NATIVE_BRANCH_CALLBACK); + JS_ToggleOptions(context, JSOPTION_XML); + + global = JS_NewObject(context, NULL, NULL, NULL); + if (!global) + return 1; + if (!JS_InitStandardClasses(context, global)) + return 1; + if (!JS_DefineFunction(context, global, "evalcx", EvalInContext, 0, 0) + || !JS_DefineFunction(context, global, "gc", GC, 0, 0) + || !JS_DefineFunction(context, global, "print", Print, 0, 0) + || !JS_DefineFunction(context, global, "quit", Quit, 0, 0) + || !JS_DefineFunction(context, global, "readline", ReadLine, 0, 0) + || !JS_DefineFunction(context, global, "seal", Seal, 0, 0)) + return 1; + + if (argc != 2) { + fprintf(stderr, "incorrect number of arguments\n\n"); + fprintf(stderr, "usage: %s <scriptfile>\n", argv[0]); + return 2; + } + + ExecuteScript(context, global, argv[1]); + + JS_DestroyContext(context); + JS_DestroyRuntime(runtime); + JS_ShutDown(); + + return gExitCode; +} diff --git a/src/couchdb/couch_key_tree.erl b/src/couchdb/couch_key_tree.erl new file mode 100644 index 00000000..705365bd --- /dev/null +++ b/src/couchdb/couch_key_tree.erl @@ -0,0 +1,139 @@ +% Copyright 2007, 2008 Damien Katz <damien_katz@yahoo.com> +% +% Licensed under the Apache License, Version 2.0 (the "License"); +% you may not use this file except in compliance with the License. +% You may obtain a copy of the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, +% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +% See the License for the specific language governing permissions and +% limitations under the License. + +-module(couch_key_tree). + +-export([merge/2, find_missing/2, get_key_leafs/2, get_full_key_paths/2, get/2]). +-export([map/2, get_all_leafs/1, get_leaf_keys/1, count_leafs/1]). + +% a key tree looks like this: +% Tree -> [] or [{Key, Value, Tree} | SiblingTree] +% ChildTree -> Tree +% SiblingTree -> [] or [{SiblingKey, Value, Tree} | Tree] +% And each Key < SiblingKey + + + +% key tree functions + +% When the same key is found in the trees, the value in tree B is discarded. +merge([], B) -> + B; +merge(A, []) -> + A; +merge([ATree | ANextTree], [BTree | BNextTree]) -> + {AKey, AValue, ASubTree} = ATree, + {BKey, _BValue, BSubTree} = BTree, + if + AKey == BKey -> + %same key + MergedSubTree = merge(ASubTree, BSubTree), + MergedNextTree = merge(ANextTree, BNextTree), + [{AKey, AValue, MergedSubTree} | MergedNextTree]; + AKey < BKey -> + [ATree | merge(ANextTree, [BTree | BNextTree])]; + true -> + [BTree | merge([ATree | ANextTree], BNextTree)] + end. + +find_missing(_Tree, []) -> + []; +find_missing([], Keys) -> + Keys; +find_missing([{Key, _, SubTree} | RestTree], Keys) -> + SrcKeys2 = Keys -- Key, + SrcKeys3 = find_missing(SubTree, SrcKeys2), + find_missing(RestTree, SrcKeys3). + + +% get the leafs in the tree matching the keys. The matching key nodes can be +% leafs or an inner nodes. If an inner node, then the leafs for that node +% are returned. +get_key_leafs(Tree, Keys) -> + get_key_leafs(Tree, Keys, []). + +get_key_leafs(_Tree, [], _KeyPathAcc) -> + {[], []}; +get_key_leafs([], KeysToGet, _KeyPathAcc) -> + {[], KeysToGet}; +get_key_leafs([{Key, _Value, SubTree}=Tree | RestTree], KeysToGet, KeyPathAcc) -> + case KeysToGet -- [Key] of + KeysToGet -> % same list, key not found + {LeafsFound, KeysToGet2} = get_key_leafs(SubTree, KeysToGet, [Key | KeyPathAcc]), + {RestLeafsFound, KeysRemaining} = get_key_leafs(RestTree, KeysToGet2, KeyPathAcc), + {LeafsFound ++ RestLeafsFound, KeysRemaining}; + KeysToGet2 -> + LeafsFound = get_all_leafs([Tree], KeyPathAcc), + LeafKeysFound = [LeafKeyFound || {LeafKeyFound, _, _} <- LeafsFound], + KeysToGet2 = KeysToGet2 -- LeafKeysFound, + {RestLeafsFound, KeysRemaining} = get_key_leafs(RestTree, KeysToGet2, KeyPathAcc), + {LeafsFound ++ RestLeafsFound, KeysRemaining} + end. + +get(Tree, KeysToGet) -> + {KeyPaths, KeysNotFound} = get_full_key_paths(Tree, KeysToGet), + FixedResults = [ {Key, Value, [Key0 || {Key0, _} <- Path]} || [{Key, Value}|_] = Path <- KeyPaths], + {FixedResults, KeysNotFound}. + +get_full_key_paths(Tree, Keys) -> + get_full_key_paths(Tree, Keys, []). + +get_full_key_paths(_Tree, [], _KeyPathAcc) -> + {[], []}; +get_full_key_paths([], KeysToGet, _KeyPathAcc) -> + {[], KeysToGet}; +get_full_key_paths([{KeyId, Value, SubTree} | RestTree], KeysToGet, KeyPathAcc) -> + KeysToGet2 = KeysToGet -- [KeyId], + CurrentNodeResult = + case length(KeysToGet2) == length(KeysToGet) of + true -> % not in the key list. + []; + false -> % this node is the key list. return it + [[{KeyId, Value} | KeyPathAcc]] + end, + {KeysGotten, KeysRemaining} = get_full_key_paths(SubTree, KeysToGet2, [{KeyId, Value} | KeyPathAcc]), + {KeysGotten2, KeysRemaining2} = get_full_key_paths(RestTree, KeysRemaining, KeyPathAcc), + {CurrentNodeResult ++ KeysGotten ++ KeysGotten2, KeysRemaining2}. + +get_all_leafs(Tree) -> + get_all_leafs(Tree, []). + +get_all_leafs([], _KeyPathAcc) -> + []; +get_all_leafs([{KeyId, Value, []} | RestTree], KeyPathAcc) -> + [{KeyId, Value, [KeyId | KeyPathAcc]} | get_all_leafs(RestTree, KeyPathAcc)]; +get_all_leafs([{KeyId, _Value, SubTree} | RestTree], KeyPathAcc) -> + get_all_leafs(SubTree, [KeyId | KeyPathAcc]) ++ get_all_leafs(RestTree, KeyPathAcc). + +get_leaf_keys([]) -> + []; +get_leaf_keys([{Key, _Value, []} | RestTree]) -> + [Key | get_leaf_keys(RestTree)]; +get_leaf_keys([{_Key, _Value, SubTree} | RestTree]) -> + get_leaf_keys(SubTree) ++ get_leaf_keys(RestTree). + +count_leafs([]) -> + 0; +count_leafs([{_Key, _Value, []} | RestTree]) -> + 1 + count_leafs(RestTree); +count_leafs([{_Key, _Value, SubTree} | RestTree]) -> + count_leafs(SubTree) + count_leafs(RestTree). + + +map(_Fun, []) -> + []; +map(Fun, [{Key, Value, SubTree} | RestTree]) -> + Value2 = Fun(Key, Value), + [{Key, Value2, map(Fun, SubTree)} | map(Fun, RestTree)]. + diff --git a/src/couchdb/couch_log.erl b/src/couchdb/couch_log.erl new file mode 100644 index 00000000..47e0114d --- /dev/null +++ b/src/couchdb/couch_log.erl @@ -0,0 +1,130 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-module(couch_log). +-behaviour(gen_event). + +-export([start_link/2,stop/0]). +-export([error/1,error/2,info/1,info/2,debug/1,debug/2,get_level/0,get_level_integer/0, set_level/1]). +-export([init/1, handle_event/2, terminate/2, code_change/3, handle_info/2, handle_call/2]). + +-define(LEVEL_ERROR, 3). +-define(LEVEL_INFO, 2). +-define(LEVEL_DEBUG, 1). +-define(LEVEL_TMI, 0). + +level_integer(error) -> ?LEVEL_ERROR; +level_integer(info) -> ?LEVEL_INFO; +level_integer(debug) -> ?LEVEL_DEBUG; +level_integer(tmi) -> ?LEVEL_TMI; +level_integer(_Else) -> ?LEVEL_ERROR. % anything else default to ERROR level + +level_atom(?LEVEL_ERROR) -> error; +level_atom(?LEVEL_INFO) -> info; +level_atom(?LEVEL_DEBUG) -> debug; +level_atom(?LEVEL_TMI) -> tmi. + + +start_link(Filename, Level) -> + couch_event_sup:start_link({local, couch_log}, error_logger, couch_log, {Filename, Level}). + +stop() -> + couch_event_sup:stop(couch_log). + +init({Filename, Level}) -> + {ok, Fd} = file:open(Filename, [append]), + {ok, {Fd, level_integer(Level)}}. + +error(Msg) -> + error("~s", [Msg]). + +error(Format, Args) -> + error_logger:error_report(couch_error, {Format, Args}). + +info(Msg) -> + info("~s", [Msg]). + +info(Format, Args) -> + case get_level_integer() =< ?LEVEL_INFO of + true -> + error_logger:info_report(couch_info, {Format, Args}); + false -> + ok + end. + +debug(Msg) -> + debug("~s", [Msg]). + +debug(Format, Args) -> + case get_level_integer() =< ?LEVEL_DEBUG of + true -> + error_logger:info_report(couch_debug, {Format, Args}); + false -> + ok + end. + +set_level(LevelAtom) -> + set_level_integer(level_integer(LevelAtom)). + +get_level() -> + level_atom(get_level_integer()). + +get_level_integer() -> + catch gen_event:call(error_logger, couch_log, get_level_integer). + +set_level_integer(Int) -> + gen_event:call(error_logger, couch_log, {set_level_integer, Int}). + +handle_event({error_report, _, {Pid, couch_error, {Format, Args}}}, {Fd, _LogLevel}=State) -> + log(Fd, Pid, error, Format, Args), + {ok, State}; +handle_event({error_report, _, {Pid, _, _}}=Event, {Fd, _LogLevel}=State) -> + log(Fd, Pid, error, "~p", [Event]), + {ok, State}; +handle_event({error, _, {Pid, Format, Args}}, {Fd, _LogLevel}=State) -> + log(Fd, Pid, error, Format, Args), + {ok, State}; +handle_event({info_report, _, {Pid, couch_info, {Format, Args}}}, {Fd, LogLevel}=State) +when LogLevel =< ?LEVEL_INFO -> + log(Fd, Pid, info, Format, Args), + {ok, State}; +handle_event({info_report, _, {Pid, couch_debug, {Format, Args}}}, {Fd, LogLevel}=State) +when LogLevel =< ?LEVEL_DEBUG -> + log(Fd, Pid, debug, Format, Args), + {ok, State}; +handle_event({_, _, {Pid, _, _}}=Event, {Fd, LogLevel}=State) +when LogLevel =< ?LEVEL_TMI -> + % log every remaining event if tmi! + log(Fd, Pid, tmi, "~p", [Event]), + {ok, State}; +handle_event(_Event, State) -> + {ok, State}. + +handle_call(get_level_integer, {_Fd, LogLevel}=State) -> + {ok, LogLevel, State}; +handle_call({set_level_integer, NewLevel}, {Fd, _LogLevel}) -> + {ok, ok, {Fd, NewLevel}}. + +handle_info(_Info, State) -> + {ok, State}. + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +terminate(_Arg, {Fd, _LoggingLevel}) -> + file:close(Fd). + +log(Fd, Pid, Level, Format, Args) -> + Msg = io_lib:format(Format, Args), + ok = io:format("[~s] [~p] ~s~n", [Level, Pid, Msg]), % dump to console too + {ok, Msg2, _} = regexp:gsub(lists:flatten(Msg),"\\r\\n|\\r|\\n", "\r\n"), + ok = io:format(Fd, "[~s] [~s] [~p] ~s\r~n\r~n", [httpd_util:rfc1123_date(), Level, Pid, Msg2]). diff --git a/src/couchdb/couch_query_servers.erl b/src/couchdb/couch_query_servers.erl new file mode 100644 index 00000000..19cba9bd --- /dev/null +++ b/src/couchdb/couch_query_servers.erl @@ -0,0 +1,206 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-module(couch_query_servers). +-behaviour(gen_server). + +-export([start_link/1]). + +-export([init/1, terminate/2, handle_call/3, handle_cast/2, handle_info/2,code_change/3,stop/0]). +-export([start_doc_map/2, map_docs/2, stop_doc_map/1]). + +-export([test/0, test/1]). + +-include("couch_db.hrl"). + +timeout() -> + % hardcoded 5 sec timeout per document + 5000. + +start_link(QueryServerList) -> + gen_server:start_link({local, couch_query_servers}, couch_query_servers, QueryServerList, []). + +stop() -> + exit(whereis(couch_query_servers), close). + +readline(Port) -> + readline(Port, []). + +readline(Port, Acc) -> + Timer = erlang:send_after(timeout(), self(), timeout), + Result = + receive + {Port, {data, {noeol, Data}}} -> + readline(Port, [Data|Acc]); + {Port, {data, {eol, Data}}} -> + lists:flatten(lists:reverse(Acc, Data)); + {Port, Err} -> + catch port_close(Port), + erlang:cancel_timer(Timer), + throw({map_process_error, Err}); + timeout -> + catch port_close(Port), + throw({map_process_error, "map function timed out"}) + end, + case erlang:cancel_timer(Timer) of + false -> + % message already sent. clear it + receive timeout -> ok end; + _ -> + ok + end, + Result. + +read_json(Port) -> + case cjson:decode(readline(Port)) of + {obj, [{"log", Msg}]} when is_list(Msg) -> + % we got a message to log. Log it and continue + couch_log:info("Query Server Log Message: ~s", [Msg]), + read_json(Port); + Else -> + Else + end. + +writeline(Port, String) -> + true = port_command(Port, String ++ "\n"). + +% send command and get a response. +prompt(Port, Json) -> + writeline(Port, cjson:encode(Json)), + read_json(Port). + + +start_doc_map(Lang, Functions) -> + Port = + case gen_server:call(couch_query_servers, {get_port, Lang}) of + {ok, Port0} -> + link(Port0), + Port0; + {empty, Cmd} -> + couch_log:info("Spawning new ~s instance.", [Lang]), + open_port({spawn, Cmd}, [stream, + {line, 1000}, + exit_status, + hide]); + Error -> + throw(Error) + end, + true = prompt(Port, {"reset"}), + % send the functions as json strings + lists:foreach(fun(FunctionSource) -> + case prompt(Port, {"add_fun", FunctionSource}) of + true -> ok; + {obj, [{"error", Id}, {"reason", Reason}]} -> + throw({Id, Reason}) + end + end, + Functions), + {ok, {Lang, Port}}. + +map_docs({_Lang, Port}, Docs) -> + % send the documents + Results = + lists:map( + fun(Doc) -> + Json = couch_doc:to_json_obj(Doc, []), + case prompt(Port, {"map_doc", Json}) of + {obj, [{"error", Id}, {"reason", Reason}]} -> + throw({list_to_atom(Id),Reason}); + {obj, [{"reason", Reason}, {"error", Id}]} -> + throw({list_to_atom(Id),Reason}); + Results when is_tuple(Results) -> + % the results are a json array of function map yields like this: + % {FunResults1, FunResults2 ...} + % where funresults is are json arrays of key value pairs: + % {{Key1, Value1}, {Key2, Value2}} + % Convert to real lists, execept the key, value pairs + [tuple_to_list(FunResult) || FunResult <- tuple_to_list(Results)] + end + end, + Docs), + {ok, Results}. + + +stop_doc_map(nil) -> + ok; +stop_doc_map({Lang, Port}) -> + ok = gen_server:call(couch_query_servers, {return_port, {Lang, Port}}), + true = unlink(Port), + ok. + +init(QueryServerList) -> + {ok, {QueryServerList, []}}. + +terminate(_Reason, _Server) -> + ok. + + +handle_call({get_port, Lang}, {FromPid, _}, {QueryServerList, LangPorts}) -> + case lists:keysearch(Lang, 1, LangPorts) of + {value, {_, Port}=LangPort} -> + Result = + case catch port_connect(Port, FromPid) of + true -> + true = unlink(Port), + {ok, Port}; + Error -> + catch port_close(Port), + Error + end, + {reply, Result, {QueryServerList, LangPorts -- [LangPort]}}; + false -> + case lists:keysearch(Lang, 1, QueryServerList) of + {value, {_, ServerCmd}} -> + {reply, {empty, ServerCmd}, {QueryServerList, LangPorts}}; + false -> % not a supported language + {reply, {query_language_unknown, Lang}, {QueryServerList, LangPorts}} + end + end; +handle_call({return_port, {Lang, Port}}, _From, {QueryServerList, LangPorts}) -> + case catch port_connect(Port, self()) of + true -> + {reply, ok, {QueryServerList, [{Lang, Port} | LangPorts]}}; + _ -> + catch port_close(Port), + {reply, ok, {QueryServerList, LangPorts}} + end. + +handle_cast(_Whatever, {Cmd, Ports}) -> + {noreply, {Cmd, Ports}}. + +handle_info({Port, {exit_status, Status}}, {QueryServerList, LangPorts}) -> + case lists:keysearch(Port, 2, LangPorts) of + {value, {Lang, _}} -> + case Status of + 0 -> ok; + _ -> couch_log:error("Abnormal shutdown of ~s query server process (exit_status: ~w).", [Lang, Status]) + end, + {noreply, {QueryServerList, lists:keydelete(Port, 2, LangPorts)}}; + _ -> + couch_log:error("Unknown linked port/process crash: ~p", [Port]) + end; +handle_info(_Whatever, {Cmd, Ports}) -> + {noreply, {Cmd, Ports}}. + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +test() -> + test("../js/js -f main.js"). + +test(Cmd) -> + start_link(Cmd), + {ok, DocMap} = start_doc_map("javascript", ["function(doc) {if (doc[0] == 'a') return doc[1];}"]), + {ok, Results} = map_docs(DocMap, [#doc{body={"a", "b"}}, #doc{body={"c", "d"}},#doc{body={"a", "c"}}]), + io:format("Results: ~w~n", [Results]), + stop_doc_map(DocMap), + ok. diff --git a/src/couchdb/couch_rep.erl b/src/couchdb/couch_rep.erl new file mode 100644 index 00000000..9590d5c1 --- /dev/null +++ b/src/couchdb/couch_rep.erl @@ -0,0 +1,308 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-module(couch_rep). + +-include("couch_db.hrl"). + +-export([replicate/2, replicate/3, test/0, test_write_docs/3]). + +-record(stats, { + docs_read=0, + read_errors=0, + docs_copied=0, + copy_errors=0 + }). + + +url_encode([H|T]) -> + if + H >= $a, $z >= H -> + [H|url_encode(T)]; + H >= $A, $Z >= H -> + [H|url_encode(T)]; + H >= $0, $9 >= H -> + [H|url_encode(T)]; + H == $_; H == $.; H == $-; H == $: -> + [H|url_encode(T)]; + true -> + case lists:flatten(io_lib:format("~.16.0B", [H])) of + [X, Y] -> + [$%, X, Y | url_encode(T)]; + [X] -> + [$%, $0, X | url_encode(T)] + end + end; +url_encode([]) -> + []. + + +replicate(DbNameA, DbNameB) -> + replicate(DbNameA, DbNameB, []). + +replicate(Source, Target, Options) -> + {ok, DbSrc} = open_db(Source), + {ok, DbTgt} = open_db(Target), + {ok, HostName} = inet:gethostname(), + + RepRecKey = ?LOCAL_DOC_PREFIX ++ HostName ++ ":" ++ Source ++ ":" ++ Target, + StartTime = httpd_util:rfc1123_date(), + RepRecSrc = + case open_doc(DbSrc, RepRecKey, []) of + {ok, SrcDoc} -> SrcDoc; + _ -> #doc{id=RepRecKey} + end, + + RepRecTgt = + case open_doc(DbTgt, RepRecKey, []) of + {ok, TgtDoc} -> TgtDoc; + _ -> #doc{id=RepRecKey} + end, + + #doc{body={obj,OldRepHistoryProps}} = RepRecSrc, + #doc{body={obj,OldRepHistoryPropsTrg}} = RepRecTgt, + + SeqNum0 = + case OldRepHistoryProps == OldRepHistoryPropsTrg of + true -> + % if the records are identical, then we have a valid replication history + proplists:get_value("source_last_seq", OldRepHistoryProps, 0); + false -> + 0 + end, + + SeqNum = + case proplists:get_value(full, Options, false) + orelse proplists:get_value("full", Options, false) of + true -> 0; + false -> SeqNum0 + end, + + {NewSeqNum, Stats} = pull_rep(DbTgt, DbSrc, SeqNum, #stats{}), + case NewSeqNum == SeqNum andalso OldRepHistoryProps /= [] of + true -> + % nothing changed, don't record results + {ok, {obj, OldRepHistoryProps}}; + false -> + HistEntries =[ + {obj, + [{"start_time", StartTime}, + {"end_time", httpd_util:rfc1123_date()}, + {"start_last_seq", SeqNum}, + {"end_last_seq", NewSeqNum}, + {"docs_read", Stats#stats.docs_read}, + {"read_errors", Stats#stats.read_errors}, + {"docs_copied", Stats#stats.docs_copied}, + {"copy_errors", Stats#stats.copy_errors}]} + | tuple_to_list(proplists:get_value("history", OldRepHistoryProps, {}))], + % something changed, record results + NewRepHistory = + {obj, + [{"session_id", couch_util:new_uuid()}, + {"source_last_seq", NewSeqNum}, + {"history", list_to_tuple(lists:sublist(HistEntries, 50))}]}, + + {ok, _} = update_doc(DbSrc, RepRecSrc#doc{body=NewRepHistory}, []), + {ok, _} = update_doc(DbTgt, RepRecTgt#doc{body=NewRepHistory}, []), + {ok, NewRepHistory} + end. + +pull_rep(DbTarget, DbSource, SourceSeqNum, Stats) -> + {ok, NewSeq} = + enum_docs_since(DbSource, SourceSeqNum, + fun(#doc_info{update_seq=Seq}=SrcDocInfo, _, {_, AccStats}) -> + Stats2 = maybe_save_docs(DbTarget, DbSource, SrcDocInfo, AccStats), + {ok, {Seq, Stats2}} + end, {SourceSeqNum, Stats}), + NewSeq. + + +maybe_save_docs(DbTarget, DbSource, + #doc_info{id=Id, rev=Rev, conflict_revs=Conflicts, deleted_conflict_revs=DelConflicts}, + Stats) -> + SrcRevs = [Rev | Conflicts] ++ DelConflicts, + {ok, [{Id, MissingRevs}]} = get_missing_revs(DbTarget, [{Id, SrcRevs}]), + + case MissingRevs of + [] -> + Stats; + _Else -> + % the 'ok' below validates no unrecoverable errors (like network failure, etc). + {ok, DocResults} = open_doc_revs(DbSource, Id, MissingRevs, [latest]), + + Docs = [RevDoc || {ok, RevDoc} <- DocResults], % only match successful loads + + Stats2 = Stats#stats{ + docs_read=Stats#stats.docs_read + length(Docs), + read_errors=Stats#stats.read_errors + length(DocResults) - length(Docs)}, + + case Docs of + [] -> + Stats2; + _ -> + % the 'ok' below validates no unrecoverable errors (like network failure, etc). + ok = save_docs(DbTarget, Docs, []), + Stats2#stats{docs_copied=Stats2#stats.docs_copied+length(Docs)} + end + end. + + +do_http_request(Url, Action) -> + do_http_request(Url, Action, []). + +do_http_request(Url, Action, JsonBody) -> + couch_log:debug("couch_rep HTTP client request:"), + couch_log:debug("\tAction: ~p", [Action]), + couch_log:debug("\tUrl: ~p", [Url]), + Request = + case JsonBody of + [] -> + {Url, []}; + _ -> + {Url, [], "application/json; charset=utf-8", lists:flatten(cjson:encode(JsonBody))} + end, + {ok, {{_, ResponseCode,_},_Headers, ResponseBody}} = http:request(Action, Request, [], []), + if + ResponseCode >= 200, ResponseCode < 500 -> + cjson:decode(ResponseBody) + end. + +enum_docs0(_InFun, [], Acc) -> + Acc; +enum_docs0(InFun, [DocInfo | Rest], Acc) -> + case InFun(DocInfo, 0, Acc) of + {ok, Acc2} -> enum_docs0(InFun, Rest, Acc2); + {stop, Acc2} -> Acc2 + end. + +open_db("http" ++ DbName)-> + case lists:last(DbName) of + $/ -> + {ok, "http" ++ DbName}; + _ -> + {ok, "http" ++ DbName ++ "/"} + end; +open_db(DbName)-> + couch_server:open(DbName). + + +enum_docs_since(DbUrl, StartSeq, InFun, InAcc) when is_list(DbUrl) -> + Url = DbUrl ++ "_all_docs_by_seq?startkey=" ++ integer_to_list(StartSeq), + {obj, Results} = do_http_request(Url, get), + DocInfoList= + lists:map(fun({obj, RowInfoList}) -> + {obj, RowValueProps} = proplists:get_value("value", RowInfoList), + #doc_info{ + id=proplists:get_value("id", RowInfoList), + rev=proplists:get_value("rev", RowValueProps), + update_seq = proplists:get_value("key", RowInfoList), + conflict_revs = + tuple_to_list(proplists:get_value("conflicts", RowValueProps, {})), + deleted_conflict_revs = + tuple_to_list(proplists:get_value("deleted_conflicts", RowValueProps, {})), + deleted = proplists:get_value("deleted", RowValueProps, false)} + end, tuple_to_list(proplists:get_value("rows", Results))), + {ok, enum_docs0(InFun, DocInfoList, InAcc)}; +enum_docs_since(DbSource, StartSeq, Fun, Acc) -> + couch_db:enum_docs_since(DbSource, StartSeq, Fun, Acc). + +get_missing_revs(DbUrl, DocIdRevsList) when is_list(DbUrl) -> + JsonDocIdRevsList = {obj, + [{Id, list_to_tuple(RevList)} || {Id, RevList} <- DocIdRevsList]}, + {obj, ResponseMembers} = + do_http_request(DbUrl ++ "_missing_revs", + post, JsonDocIdRevsList), + {obj, DocMissingRevsList} = proplists:get_value("missing_revs", ResponseMembers), + {ok, [{Id, tuple_to_list(MissingRevs)} || {Id, MissingRevs} <- DocMissingRevsList]}; +get_missing_revs(Db, DocId) -> + couch_db:get_missing_revs(Db, DocId). + + +update_doc(DbUrl, #doc{id=DocId}=Doc, _Options) when is_list(DbUrl) -> + Url = DbUrl ++ url_encode(DocId), + {obj, ResponseMembers} = + do_http_request(Url, put, couch_doc:to_json_obj(Doc, [revs,attachments])), + RevId = proplists:get_value("_rev", ResponseMembers), + {ok, RevId}; +update_doc(Db, Doc, Options) -> + couch_db:update_doc(Db, Doc, Options). + +save_docs(_, [], _) -> + ok; +save_docs(DbUrl, Docs, []) when is_list(DbUrl) -> + JsonDocs = [couch_doc:to_json_obj(Doc, [revs,attachments]) || Doc <- Docs], + {obj, Returned} = + do_http_request(DbUrl ++ "_bulk_docs", post, {obj, [{new_edits, false}, {docs, list_to_tuple(JsonDocs)}]}), + true = proplists:get_value("ok", Returned), + ok; +save_docs(Db, Docs, Options) -> + couch_db:save_docs(Db, Docs, Options). + + +open_doc(DbUrl, DocId, []) when is_list(DbUrl) -> + case do_http_request(DbUrl ++ url_encode(DocId), get) of + {obj, [{"error", ErrId}, {"reason", Reason}]} -> + {list_to_atom(ErrId), Reason}; + Doc -> + {ok, couch_doc:from_json_obj(Doc)} + end; +open_doc(Db, DocId, Options) when not is_list(Db) -> + couch_db:open_doc(Db, DocId, Options). + + +open_doc_revs(DbUrl, DocId, Revs, Options) when is_list(DbUrl) -> + QueryOptionStrs = + lists:map(fun(latest) -> + % latest is only option right now + "latest=true" + end, Options), + RevsQueryStrs = lists:flatten(cjson:encode(list_to_tuple(Revs))), + Url = DbUrl ++ DocId ++ "?" ++ couch_util:implode(["revs=true", "attachments=true", "open_revs=" ++ RevsQueryStrs ] ++ QueryOptionStrs, "&"), + JsonResults = do_http_request(Url, get, []), + Results = + lists:map( + fun({obj, [{"missing", Rev}]}) -> + {{not_found, missing}, Rev}; + ({obj, [{"ok", JsonDoc}]}) -> + {ok, couch_doc:from_json_obj(JsonDoc)} + end, tuple_to_list(JsonResults)), + {ok, Results}; +open_doc_revs(Db, DocId, Revs, Options) -> + couch_db:open_doc_revs(Db, DocId, Revs, Options). + + + + + +test() -> + couch_server:start(), + %{ok, LocalA} = couch_server:open("replica_a"), + {ok, LocalA} = couch_server:create("replica_a", [overwrite]), + {ok, _} = couch_server:create("replica_b", [overwrite]), + %DbA = "replica_a", + DbA = "http://localhost:5984/replica_a/", + %DbB = "replica_b", + DbB = "http://localhost:5984/replica_b/", + _DocUnids = test_write_docs(10, LocalA, []), + replicate(DbA, DbB), + %{ok, _Rev} = couch_db:delete_doc(LocalA, lists:nth(1, DocUnids), any), + % replicate(DbA, DbB), + ok. + +test_write_docs(0, _Db, Output) -> + lists:reverse(Output); +test_write_docs(N, Db, Output) -> + Doc = #doc{ + id=integer_to_list(N), + body={obj, [{"foo", integer_to_list(N)}, {"num", N}, {"bar", "blah"}]}}, + couch_db:save_doc(Db, Doc, []), + test_write_docs(N-1, Db, [integer_to_list(N) | Output]). diff --git a/src/couchdb/couch_server.erl b/src/couchdb/couch_server.erl new file mode 100644 index 00000000..bb3617b2 --- /dev/null +++ b/src/couchdb/couch_server.erl @@ -0,0 +1,215 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-module(couch_server). +-behaviour(gen_server). +-behaviour(application). + +-export([start/0,start/1,start/2,stop/0,stop/1]). +-export([open/1,create/2,delete/1,all_databases/0,get_version/0]). +-export([init/1, handle_call/3,sup_start_link/2]). +-export([handle_cast/2,code_change/3,handle_info/2,terminate/2]). +-export([dev_start/0,remote_restart/0]). + +-include("couch_db.hrl"). + +-record(server,{ + root_dir = [], + dbname_regexp, + options=[] + }). + +start() -> + start(""). + +start(IniFile) when is_atom(IniFile) -> + couch_server_sup:start_link(atom_to_list(IniFile) ++ ".ini"); +start(IniNum) when is_integer(IniNum) -> + couch_server_sup:start_link("couch" ++ integer_to_list(IniNum) ++ ".ini"); +start(IniFile) -> + couch_server_sup:start_link(IniFile). + +start(_Type, _Args) -> + start(). + +stop() -> + couch_server_sup:stop(). + +stop(_Reason) -> + stop(). + +dev_start() -> + stop(), + up_to_date = make:all([load, debug_info]), + start(). + +get_version() -> + Apps = application:loaded_applications(), + case lists:keysearch(couch, 1, Apps) of + {value, {_, _, Vsn}} -> + Vsn; + false -> + "0.0.0" + end. + +sup_start_link(RootDir, Options) -> + gen_server:start_link({local, couch_server}, couch_server, {RootDir, Options}, []). + +open(Filename) -> + gen_server:call(couch_server, {open, Filename}). + +create(Filename, Options) -> + gen_server:call(couch_server, {create, Filename, Options}). + +delete(Filename) -> + gen_server:call(couch_server, {delete, Filename}). + +remote_restart() -> + gen_server:call(couch_server, remote_restart). + +init({RootDir, Options}) -> + {ok, RegExp} = regexp:parse("^[a-z][a-z0-9\\_\\$()\\+\\-\\/]*$"), + {ok, #server{root_dir=RootDir, dbname_regexp=RegExp, options=Options}}. + +check_filename(#server{dbname_regexp=RegExp}, Filename) -> + case regexp:match(Filename, RegExp) of + nomatch -> + {error, illegal_database_name}; + _Match -> + ok + end. + +get_full_filename(Server, Filename) -> + filename:join([Server#server.root_dir, "./" ++ Filename ++ ".couch"]). + + +terminate(_Reason, _Server) -> + ok. + +all_databases() -> + {ok, Root} = gen_server:call(couch_server, get_root), + Filenames = + filelib:fold_files(Root, "^[a-z0-9\\_\\$()\\+\\-]*[\\.]couch$", true, + fun(Filename, AccIn) -> + case Filename -- Root of + [$/ | RelativeFilename] -> ok; + RelativeFilename -> ok + end, + [filename:rootname(RelativeFilename, ".couch") | AccIn] + end, []), + {ok, Filenames}. + + +handle_call(get_root, _From, #server{root_dir=Root}=Server) -> + {reply, {ok, Root}, Server}; +handle_call({open, Filename}, From, Server) -> + case check_filename(Server, Filename) of + {error, Error} -> + {reply, {error, Error}, Server}; + ok -> + Filepath = get_full_filename(Server, Filename), + Result = supervisor:start_child(couch_server_sup, + {Filename, + {couch_db, open, [Filename, Filepath]}, + transient , + infinity, + supervisor, + [couch_db]}), + case Result of + {ok, Db} -> + {reply, {ok, Db}, Server}; + {error, already_present} -> + ok = supervisor:delete_child(couch_server_sup, Filename), + % call self recursively + handle_call({open, Filename}, From, Server); + {error, {already_started, Db}} -> + {reply, {ok, Db}, Server}; + {error, {not_found, _}} -> + {reply, not_found, Server}; + {error, {Error, _}} -> + {reply, {error, Error}, Server} + end + end; +handle_call({create, Filename, Options}, _From, Server) -> + case check_filename(Server, Filename) of + {error, Error} -> + {reply, {error, Error}, Server}; + ok -> + Filepath = get_full_filename(Server, Filename), + ChildSpec = {Filename, + {couch_db, create, [Filename, Filepath, Options]}, + transient, + infinity, + supervisor, + [couch_db]}, + Result = + case supervisor:delete_child(couch_server_sup, Filename) of + ok -> + sup_start_child(couch_server_sup, ChildSpec); + {error, not_found} -> + sup_start_child(couch_server_sup, ChildSpec); + {error, running} -> + % a server process for this database already started. Maybe kill it + case lists:member(overwrite, Options) of + true -> + supervisor:terminate_child(couch_server_sup, Filename), + ok = supervisor:delete_child(couch_server_sup, Filename), + sup_start_child(couch_server_sup, ChildSpec); + false -> + {error, database_already_exists} + end + end, + case Result of + {ok, _Db} -> couch_db_update_notifier:notify({created, Filename}); + _ -> ok + end, + {reply, Result, Server} + end; +handle_call({delete, Filename}, _From, Server) -> + FullFilepath = get_full_filename(Server, Filename), + supervisor:terminate_child(couch_server_sup, Filename), + supervisor:delete_child(couch_server_sup, Filename), + case file:delete(FullFilepath) of + ok -> + couch_db_update_notifier:notify({deleted, Filename}), + {reply, ok, Server}; + {error, enoent} -> + {reply, not_found, Server}; + Else -> + {reply, Else, Server} + end; +handle_call(remote_restart, _From, #server{options=Options}=Server) -> + case proplists:get_value(remote_restart, Options) of + true -> + exit(self(), restart); + _ -> + ok + end, + {reply, ok, Server}. + +% this function is just to strip out the child spec error stuff if hit +sup_start_child(couch_server_sup, ChildSpec) -> + case supervisor:start_child(couch_server_sup, ChildSpec) of + {error, {Error, _ChildInfo}} -> + {error, Error}; + Else -> + Else + end. + +handle_cast(_Msg, State) -> + {noreply,State}. + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +handle_info(_Info, State) -> + {noreply, State}. diff --git a/src/couchdb/couch_server_sup.erl b/src/couchdb/couch_server_sup.erl new file mode 100644 index 00000000..8b9889e7 --- /dev/null +++ b/src/couchdb/couch_server_sup.erl @@ -0,0 +1,185 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-module(couch_server_sup). +-behaviour(supervisor). + +-define(DEFAULT_INI, "couch.ini"). + +-export([start_link/1,stop/0]). + +%% supervisor callbacks +-export([init/1]). + +start_link(IniFilename) -> + case whereis(couch_server_sup) of + undefined -> + start_server(IniFilename); + _Else -> + {error, already_started} + end. + +start_server("") -> + % no ini file specified, check the command line args + IniFile = + case init:get_argument(couchini) of + {ok, [CmdLineIniFilename]} -> + CmdLineIniFilename; + _Else -> + ?DEFAULT_INI + end, + start_server(IniFile); +start_server(InputIniFilename) -> + + case init:get_argument(pidfile) of + {ok, [PidFile]} -> + case file:write_file(PidFile, os:getpid()) of + ok -> ok; + Error -> io:format("Failed to write PID file ~s, error: ~p", [PidFile, Error]) + end; + _ -> ok + end, + + {ok, Cwd} = file:get_cwd(), + IniFilename = couch_util:abs_pathname(InputIniFilename), + IniBin = + case file:read_file(IniFilename) of + {ok, IniBin0} -> + IniBin0; + {error, enoent} -> + Msg = io_lib:format("Couldn't find server configuration file ~s.", [InputIniFilename]), + io:format("~s~n", [Msg]), + throw({startup_error, Msg}) + end, + {ok, Ini} = couch_util:parse_ini(binary_to_list(IniBin)), + + ConsoleStartupMsg = proplists:get_value({"Couch", "ConsoleStartupMsg"}, Ini, "Apache CouchDB is starting."), + LogLevel = list_to_atom(proplists:get_value({"Couch", "LogLevel"}, Ini, "error")), + DbRootDir = proplists:get_value({"Couch", "DbRootDir"}, Ini, "."), + HttpConfigFile = proplists:get_value({"Couch", "HttpConfigFile"}, Ini, "couch_httpd.conf"), + LogFile = proplists:get_value({"Couch", "LogFile"}, Ini, "couchdb.log"), + UtilDriverDir = proplists:get_value({"Couch", "UtilDriverDir"}, Ini, ""), + UpdateNotifierExes = proplists:get_all_values({"Couch", "DbUpdateNotificationProcess"}, Ini), + FtSearchQueryServer = proplists:get_value({"Couch", "FullTextSearchQueryServer"}, Ini, ""), + RemoteRestart = list_to_atom(proplists:get_value({"Couch", "AllowRemoteRestart"}, Ini, "undefined")), + ServerOptions = [{remote_restart, RemoteRestart}], + QueryServers = [{Lang, QueryExe} || {{"Couch Query Servers", Lang}, QueryExe} <- Ini], + + ChildProcesses = + [{couch_log, + {couch_log, start_link, [LogFile, LogLevel]}, + permanent, + brutal_kill, + worker, + [couch_server]}, + {couch_db_update_event, + {gen_event, start_link, [{local, couch_db_update}]}, + permanent, + 1000, + supervisor, + dynamic}, + {couch_server, + {couch_server, sup_start_link, [DbRootDir, ServerOptions]}, + permanent, + brutal_kill, + worker, + [couch_server]}, + {couch_util, + {couch_util, start_link, [UtilDriverDir]}, + permanent, + brutal_kill, + worker, + [couch_util]}, + {couch_query_servers, + {couch_query_servers, start_link, [QueryServers]}, + permanent, + brutal_kill, + worker, + [couch_query_servers]}, + {couch_view, + {couch_view, start_link, [DbRootDir]}, + permanent, + brutal_kill, + worker, + [couch_view]}, + {httpd, + {httpd, start_link, [HttpConfigFile]}, + permanent, + 1000, + supervisor, + [httpd]} + ] ++ + lists:map(fun(UpdateNotifierExe) -> + {UpdateNotifierExe, + {couch_db_update_notifier, start_link, [UpdateNotifierExe]}, + permanent, + 1000, + supervisor, + [couch_db_update_notifier]} + end, UpdateNotifierExes) + ++ + case FtSearchQueryServer of + "" -> + []; + _ -> + [{couch_ft_query, + {couch_ft_query, start_link, [FtSearchQueryServer]}, + permanent, + 1000, + supervisor, + [httpd]}] + end, + + io:format("couch ~s (LogLevel=~s)~n", [couch_server:get_version(), LogLevel]), + io:format("~s~n", [ConsoleStartupMsg]), + + process_flag(trap_exit, true), + StartResult = (catch supervisor:start_link( + {local, couch_server_sup}, couch_server_sup, ChildProcesses)), + + ConfigInfo = io_lib:format("Config Info ~s:~n\tCurrentWorkingDir=~s~n" ++ + "\tDbRootDir=~s~n" ++ + "\tHttpConfigFile=~s~n" ++ + "\tLogFile=~s~n" ++ + "\tUtilDriverDir=~s~n" ++ + "\tDbUpdateNotificationProcesses=~s~n" ++ + "\tFullTextSearchQueryServer=~s~n" ++ + "~s", + [IniFilename, + Cwd, + DbRootDir, + HttpConfigFile, + LogFile, + UtilDriverDir, + UpdateNotifierExes, + FtSearchQueryServer, + [lists:flatten(io_lib:format("\t~s=~s~n", [Lang, QueryExe])) || {Lang, QueryExe} <- QueryServers]]), + couch_log:debug("~s", [ConfigInfo]), + + case StartResult of + {ok,_} -> + % only output when startup was successful + io:format("Apache CouchDB has started. Time to relax.~n"); + _ -> + % Since we failed startup, unconditionally dump configuration data to console + io:format("~s", [ConfigInfo]), + ok + end, + process_flag(trap_exit, false), + StartResult. + +stop() -> + catch exit(whereis(couch_server_sup), normal), + couch_log:stop(). + +init(ChildProcesses) -> + {ok, {{one_for_one, 10, 3600}, ChildProcesses}}. diff --git a/src/couchdb/couch_stream.erl b/src/couchdb/couch_stream.erl new file mode 100644 index 00000000..d5157b4d --- /dev/null +++ b/src/couchdb/couch_stream.erl @@ -0,0 +1,252 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-module(couch_stream). +-behaviour(gen_server). + +-export([test/1]). +-export([open/1, open/2, close/1, read/3, read_term/2, write/2, write_term/2, get_state/1, foldl/5]). +-export([copy/4]). +-export([ensure_buffer/2, set_min_buffer/2]). +-export([init/1, terminate/2, handle_call/3]). +-export([handle_cast/2,code_change/3,handle_info/2]). + +-include("couch_db.hrl"). + +-define(FILE_POINTER_BYTES, 8). +-define(FILE_POINTER_BITS, 8*(?FILE_POINTER_BYTES)). + +-define(STREAM_OFFSET_BYTES, 4). +-define(STREAM_OFFSET_BITS, 8*(?STREAM_OFFSET_BYTES)). + +-define(HUGE_CHUNK, 1000000000). % Huge chuck size when reading all in one go + +-define(DEFAULT_STREAM_CHUNK, 16#00100000). % 1 meg chunks when streaming data + + +-record(write_stream, + {fd = 0, + current_pos = 0, + bytes_remaining = 0, + next_alloc = 0, + min_alloc = 16#00010000 + }). + +-record(stream, + { + pid, + fd + }). + + +%%% Interface functions %%% + +open(Fd) -> + open(nil, Fd). + +open(nil, Fd) -> + open({0,0}, Fd); +open(State, Fd) -> + {ok, Pid} = gen_server:start_link(couch_stream, {State, Fd}, []), + {ok, #stream{pid = Pid, fd = Fd}}. + +close(#stream{pid = Pid, fd = _Fd}) -> + gen_server:call(Pid, close). + +get_state(#stream{pid = Pid, fd = _Fd}) -> + gen_server:call(Pid, get_state). + +ensure_buffer(#stream{pid = Pid, fd = _Fd}, Bytes) -> + gen_server:call(Pid, {ensure_buffer, Bytes}). + +set_min_buffer(#stream{pid = Pid, fd = _Fd}, Bytes) -> + gen_server:call(Pid, {set_min_buffer, Bytes}). + +read(#stream{pid = _Pid, fd = Fd}, Sp, Num) -> + read(Fd, Sp, Num); +read(Fd, Sp, Num) -> + {ok, RevBin, Sp2} = stream_data(Fd, Sp, Num, ?HUGE_CHUNK, fun(Bin, Acc) -> {ok, [Bin | Acc]} end, []), + Bin = list_to_binary(lists:reverse(RevBin)), + {ok, Bin, Sp2}. + +copy(#stream{pid = _Pid, fd = Fd}, Sp, Num, DestStream) -> + copy(Fd, Sp, Num, DestStream); +copy(Fd, Sp, Num, DestStream) -> + {ok, NewSp, _Sp2} = stream_data(Fd, Sp, Num, ?HUGE_CHUNK, + fun(Bin, AccPointer) -> + {ok, NewPointer} = write(Bin, DestStream), + if AccPointer == null -> NewPointer; true -> AccPointer end + end, + null), + {ok, NewSp}. + +foldl(#stream{pid = _Pid, fd = Fd}, Sp, Num, Fun, Acc) -> + foldl(Fd, Sp, Num, Fun, Acc); +foldl(Fd, Sp, Num, Fun, Acc) -> + {ok, _Acc, _Sp} = stream_data(Fd, Sp, Num, ?DEFAULT_STREAM_CHUNK, Fun, Acc). + +read_term(#stream{pid = _Pid, fd = Fd}, Sp) -> + read_term(Fd, Sp); +read_term(Fd, Sp) -> + {ok, <<TermLen:(?STREAM_OFFSET_BITS)>>, Sp2} + = read(Fd, Sp, ?STREAM_OFFSET_BYTES), + {ok, Bin, _Sp3} = read(Fd, Sp2, TermLen), + {ok, binary_to_term(Bin)}. + +write_term(Stream, Term) -> + Bin = term_to_binary(Term), + Size = size(Bin), + Bin2 = <<Size:(?STREAM_OFFSET_BITS), Bin/binary>>, + write(Stream, Bin2). + +write(#stream{}, <<>>) -> + {ok, {0,0}}; +write(#stream{pid = Pid}, Bin) when is_binary(Bin) -> + gen_server:call(Pid, {write, Bin}). + + +init({{Pos, BytesRemaining}, Fd}) -> + {ok, #write_stream + {fd = Fd, + current_pos = Pos, + bytes_remaining = BytesRemaining + }}. + +terminate(_Reason, _Stream) -> + ok. + +handle_call(get_state, _From, Stream) -> + #write_stream{current_pos = Pos, bytes_remaining = BytesRemaining} = Stream, + {reply, {Pos, BytesRemaining}, Stream}; +handle_call({set_min_buffer, MinBuffer}, _From, Stream) -> + {reply, ok, Stream#write_stream{min_alloc = MinBuffer}}; +handle_call({ensure_buffer, BufferSizeRequested}, _From, Stream) -> + #write_stream{bytes_remaining = BytesRemainingInCurrentBuffer} = Stream, + case BytesRemainingInCurrentBuffer < BufferSizeRequested of + true -> NextAlloc = BufferSizeRequested - BytesRemainingInCurrentBuffer; + false -> NextAlloc = 0 % enough room in current segment + end, + {reply, ok, Stream#write_stream{next_alloc = NextAlloc}}; +handle_call({write, Bin}, _From, Stream) -> + % ensure init is called first so we can get a pointer to the begining of the binary + {ok, Sp, Stream2} = write_data(Stream, Bin), + {reply, {ok, Sp}, Stream2}; +handle_call(close, _From, Stream) -> + #write_stream{current_pos=Pos, bytes_remaining = BytesRemaining} = Stream, + {stop, normal, {ok, {Pos, BytesRemaining}}, Stream}. + +handle_cast(_Msg, State) -> + {noreply,State}. + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +handle_info(_Info, State) -> + {noreply, State}. + +%%% Internal function %%% + +stream_data(_Fd, Sp, 0, _MaxChunk, _Fun, Acc) -> + {ok, Acc, Sp}; +stream_data(Fd, {Pos, 0}, Num, MaxChunk, Fun, Acc) -> + {ok, <<NextPos:(?FILE_POINTER_BITS), NextOffset:(?STREAM_OFFSET_BITS)>>} + = couch_file:pread(Fd, Pos, ?FILE_POINTER_BYTES + ?STREAM_OFFSET_BYTES), + Sp = {NextPos, NextOffset}, + % Check NextPos is past current Pos (this is always true in a stream) + % Guards against potential infinite loops caused by corruption. + case NextPos > Pos of + true -> ok; + false -> throw({error, stream_corruption}) + end, + stream_data(Fd, Sp, Num, MaxChunk, Fun, Acc); +stream_data(Fd, {Pos, Offset}, Num, MaxChunk, Fun, Acc) -> + ReadAmount = lists:min([MaxChunk, Num, Offset]), + {ok, Bin} = couch_file:pread(Fd, Pos, ReadAmount), + Sp = {Pos + ReadAmount, Offset - ReadAmount}, + case Fun(Bin, Acc) of + {ok, Acc2} -> + stream_data(Fd, Sp, Num - ReadAmount, MaxChunk, Fun, Acc2); + {stop, Acc2} -> + {ok, Acc2, Sp} + end. + +write_data(Stream, <<>>) -> + {ok, {0,0}, Stream}; +write_data(#write_stream{bytes_remaining=0} = Stream, Bin) -> + #write_stream { + fd = Fd, + current_pos = CurrentPos, + next_alloc = NextAlloc, + min_alloc = MinAlloc + }= Stream, + + NewSize = lists:max([MinAlloc, NextAlloc, size(Bin)]), + % no space in the current segment, must alloc a new segment + {ok, NewPos} = couch_file:expand(Fd, NewSize + ?FILE_POINTER_BYTES + ?STREAM_OFFSET_BYTES), + + case CurrentPos of + 0 -> + ok; + _ -> + ok = couch_file:pwrite(Fd, CurrentPos, <<NewPos:(?FILE_POINTER_BITS), NewSize:(?STREAM_OFFSET_BITS)>>) + end, + Stream2 = Stream#write_stream{ + current_pos=NewPos, + bytes_remaining=NewSize, + next_alloc=0}, + write_data(Stream2, Bin); +write_data(#write_stream{fd=Fd, current_pos=Pos, bytes_remaining=BytesRemaining} = Stream, Bin) -> + BytesToWrite = lists:min([size(Bin), BytesRemaining]), + {WriteBin, Rest} = split_binary(Bin, BytesToWrite), + ok = couch_file:pwrite(Fd, Pos, WriteBin), + Stream2 = Stream#write_stream{ + bytes_remaining=BytesRemaining - BytesToWrite, + current_pos=Pos + BytesToWrite + }, + {ok, _, Stream3} = write_data(Stream2, Rest), + {ok, {Pos, BytesRemaining}, Stream3}. + + + +%%% Tests %%% + + +test(Term) -> + {ok, Fd} = couch_file:open("foo", [write]), + {ok, Stream} = open({0,0}, Fd), + {ok, Pos} = write_term(Stream, Term), + {ok, Pos2} = write_term(Stream, {Term, Term}), + close(Stream), + couch_file:close(Fd), + {ok, Fd2} = couch_file:open("foo", [read, write]), + {ok, Stream2} = open({0,0}, Fd2), + {ok, Term1} = read_term(Fd2, Pos), + io:format("Term1: ~w ~n",[Term1]), + {ok, Term2} = read_term(Fd2, Pos2), + io:format("Term2: ~w ~n",[Term2]), + {ok, PointerList} = deep_write_test(Stream2, Term, 1000, []), + deep_read_test(Fd2, PointerList), + close(Stream2), + couch_file:close(Fd2). + +deep_read_test(_Fd, []) -> + ok; +deep_read_test(Fd, [Pointer | RestPointerList]) -> + {ok, _Term} = read_term(Fd, Pointer), + deep_read_test(Fd, RestPointerList). + +deep_write_test(_Stream, _Term, 0, PointerList) -> + {ok, PointerList}; +deep_write_test(Stream, Term, N, PointerList) -> + WriteList = lists:duplicate(random:uniform(N), Term), + {ok, Pointer} = write_term(Stream, WriteList), + deep_write_test(Stream, Term, N-1, [Pointer | PointerList]). diff --git a/src/couchdb/couch_util.erl b/src/couchdb/couch_util.erl new file mode 100644 index 00000000..42845fe0 --- /dev/null +++ b/src/couchdb/couch_util.erl @@ -0,0 +1,316 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-module(couch_util). +-behaviour(gen_server). + +-export([start_link/0,start_link/1]). +-export([parse_ini/1]). +-export([new_uuid/0, rand32/0, implode/2, collate/2, collate/3]). +-export([abs_pathname/1,abs_pathname/2, trim/1, ascii_lower/1, test/0]). +-export([encodeBase64/1, decodeBase64/1]). + +-export([init/1, terminate/2, handle_call/3]). +-export([handle_cast/2,code_change/3,handle_info/2]). + + +start_link() -> + start_link(""). + +start_link("") -> + start_link(filename:join(code:priv_dir(couch), "lib")); +start_link(LibDir) -> + case erl_ddll:load_driver(LibDir, "couch_erl_driver") of + ok -> ok; + {error, already_loaded} -> ok; + {error, ErrorDesc} -> exit({error, ErrorDesc}) + end, + gen_server:start_link({local, couch_util}, couch_util, [], []). + + +new_uuid() -> + gen_server:call(couch_util, new_uuid). + +% returns a random integer +rand32() -> + gen_server:call(couch_util, rand32). + +% given a pathname "../foo/bar/" it gives back the fully qualified +% absolute pathname. +abs_pathname(" " ++ Filename) -> + % strip leading whitspace + abs_pathname(Filename); +abs_pathname([$/ |_]=Filename) -> + Filename; +abs_pathname(Filename) -> + {ok, Cwd} = file:get_cwd(), + {Filename2, Args} = separate_cmd_args(Filename, ""), + abs_pathname(Filename2, Cwd) ++ Args. + +abs_pathname(Filename, Dir) -> + Name = filename:absname(Filename, Dir ++ "/"), + OutFilename = filename:join(fix_path_list(filename:split(Name), [])), + % If the filename is a dir (last char slash, put back end slash + case string:right(Filename,1) of + "/" -> + OutFilename ++ "/"; + "\\" -> + OutFilename ++ "/"; + _Else-> + OutFilename + end. + +% if this as an executable with arguments, seperate out the arguments +% ""./foo\ bar.sh -baz=blah" -> {"./foo\ bar.sh", " -baz=blah"} +separate_cmd_args("", CmdAcc) -> + {lists:reverse(CmdAcc), ""}; +separate_cmd_args("\\ " ++ Rest, CmdAcc) -> % handle skipped value + separate_cmd_args(Rest, " \\" ++ CmdAcc); +separate_cmd_args(" " ++ Rest, CmdAcc) -> + {lists:reverse(CmdAcc), " " ++ Rest}; +separate_cmd_args([Char|Rest], CmdAcc) -> + separate_cmd_args(Rest, [Char | CmdAcc]). + +% lowercases string bytes that are the ascii characters A-Z. +% All other characters/bytes are ignored. +ascii_lower(String) -> + ascii_lower(String, []). + +ascii_lower([], Acc) -> + lists:reverse(Acc); +ascii_lower([Char | RestString], Acc) when Char >= $A, Char =< $B -> + ascii_lower(RestString, [Char + ($a-$A) | Acc]); +ascii_lower([Char | RestString], Acc)-> + ascii_lower(RestString, [Char | Acc]). + +% Is a character whitespace? +is_whitespace($\s)-> true; +is_whitespace($\t)-> true; +is_whitespace($\n)-> true; +is_whitespace($\r)-> true; +is_whitespace(_Else) -> false. + + +% removes leading and trailing whitespace from a string +trim(String) -> + String2 = lists:dropwhile(fun is_whitespace/1, String), + lists:reverse(lists:dropwhile(fun is_whitespace/1, lists:reverse(String2))). + +% takes a heirarchical list of dirs and removes the dots ".", double dots +% ".." and the corresponding parent dirs. +fix_path_list([], Acc) -> + lists:reverse(Acc); +fix_path_list([".."|Rest], [_PrevAcc|RestAcc]) -> + fix_path_list(Rest, RestAcc); +fix_path_list(["."|Rest], Acc) -> + fix_path_list(Rest, Acc); +fix_path_list([Dir | Rest], Acc) -> + fix_path_list(Rest, [Dir | Acc]). + + +implode(List, Sep) -> + implode(List, Sep, []). + +implode([], _Sep, Acc) -> + lists:flatten(lists:reverse(Acc)); +implode([H], Sep, Acc) -> + implode([], Sep, [H|Acc]); +implode([H|T], Sep, Acc) -> + implode(T, Sep, [Sep,H|Acc]). + + +% This is a simple ini parser. it just converts the string +% contents of a file like this: +% +%; comments are ignored +%;commentedoutvariable=foo +%this is line that gets ignored because it has no equals sign +%[this line gets ignored because it starts with a bracket but doesn't end with one +%bloodtype=Ragu +%[Some Section] +%timeout=30 +%Default=zuh ; another comment (leading space or tab before a semi is necessary to be a comment if not at beginning of line) +%[Another Section] +%key with spaces=a value with stuff; and no comment +%oops="it doesn't qet quoted strings with semis quite right ; it thinks it's part comment" +% +%And converts it into this: +%[{{"","bloodtype"},"Ragu"}, +% {{"Some Section","timeout"},"30"}, +% {{"Some section","Default"}, "zuh"}, +% {{"Another Section", "key with spaces"}, "a value with stuff; and no comment"}, +% {{"Another Section", "oops"}, "\"it doesn't qet quoted strings with semis quite right"}] +% + +parse_ini(FileContents) -> + {ok, Lines} = regexp:split(FileContents, "\r\n|\n|\r|\032"), + {_, ParsedIniValues} = + lists:foldl(fun(Line, {AccSectionName, AccValues}) -> + case string:strip(Line) of + "[" ++ Rest -> + case regexp:split(Rest, "\\]") of + {ok, [NewSectionName, ""]} -> + {NewSectionName, AccValues}; + _Else -> % end bracket not at end, ignore this line + {AccSectionName, AccValues} + end; + ";" ++ _Comment -> + {AccSectionName, AccValues}; + Line2 -> + case regexp:split(Line2, "=") of + {ok, [_SingleElement]} -> % no "=" found, ignore this line + {AccSectionName, AccValues}; + {ok, [""|_LineValues]} -> % line begins with "=", ignore + {AccSectionName, AccValues}; + {ok, [ValueName|LineValues]} -> % yeehaw, got a line! + RemainingLine = implode(LineValues, "="), + {ok, [LineValue | _Rest]} = regexp:split(RemainingLine, " ;|\t;"), % removes comments + {AccSectionName, [{{AccSectionName, ValueName}, LineValue} | AccValues]} + end + end + end, {"", []}, Lines), + {ok, lists:reverse(ParsedIniValues)}. + +init([]) -> + {A,B,C} = erlang:now(), + random:seed(A,B,C), + {ok, dummy_server}. + +terminate(_Reason, _Server) -> + ok. + +handle_call(new_uuid, _From, Server) -> + {reply, new_uuid_int(), Server}; +handle_call(rand32, _From, Server) -> + {reply, rand32_int(), Server}. + +handle_cast(_Msg, State) -> + {noreply,State}. + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +handle_info(_Info, State) -> + {noreply, State}. + + +new_uuid_int() -> + % eventually make this a C callout for a real guid (collisions are far less likely + % when using a proper generation function). For now we just fake it. + Num1 = random:uniform(16#FFFFFFFF + 1) - 1, + Num2 = random:uniform(16#FFFFFFFF + 1) - 1, + Num3 = random:uniform(16#FFFFFFFF + 1) - 1, + Num4 = random:uniform(16#FFFFFFFF + 1) - 1, + lists:flatten(io_lib:format("~8.16.0B~8.16.0B~8.16.0B~8.16.0B", [Num1, Num2, Num3, Num4])). + + + +rand32_int() -> + random:uniform(16#FFFFFFFF + 1) - 1. + +drv_port() -> + case get(couch_drv_port) of + undefined -> + Port = open_port({spawn, "couch_erl_driver"}, []), + put(couch_drv_port, Port), + Port; + Port -> + Port + end. + +collate(A, B) -> + collate(A, B, []). + +collate(A, B, Options) when is_list(A), is_list(B) -> + Operation = + case lists:member(nocase, Options) of + true -> 1; % Case insensitive + false -> 0 % Case sensitive + end, + Port = drv_port(), + LenA = length(A), + LenB = length(B), + Bin = list_to_binary([<<LenA:32/native>>, A, <<LenB:32/native>>, B]), + case erlang:port_control(Port, Operation, Bin) of + [0] -> -1; + [1] -> 1; + [2] -> 0 + end. + + + + +%%% Purpose : Base 64 encoding and decoding. +%%% Copied from ssl_base_64 to avoid using the +%%% erlang ssl library + +-define(st(X,A), ((X-A+256) div 256)). +-define(CHARS, 64). + +%% A PEM encoding consists of characters A-Z, a-z, 0-9, +, / and +%% =. Each character encodes a 6 bits value from 0 to 63 (A = 0, / = +%% 63); = is a padding character. +%% + +%% +%% encode64(Bytes|Binary) -> Chars +%% +%% Take 3 bytes a time (3 x 8 = 24 bits), and make 4 characters out of +%% them (4 x 6 = 24 bits). +%% +encodeBase64(Bs) when list(Bs) -> + encodeBase64(list_to_binary(Bs)); +encodeBase64(<<B:3/binary, Bs/binary>>) -> + <<C1:6, C2:6, C3:6, C4:6>> = B, + [enc(C1), enc(C2), enc(C3), enc(C4)| encodeBase64(Bs)]; +encodeBase64(<<B:2/binary>>) -> + <<C1:6, C2:6, C3:6, _:6>> = <<B/binary, 0>>, + [enc(C1), enc(C2), enc(C3), $=]; +encodeBase64(<<B:1/binary>>) -> + <<C1:6, C2:6, _:12>> = <<B/binary, 0, 0>>, + [enc(C1), enc(C2), $=, $=]; +encodeBase64(<<>>) -> + []. + +%% +%% decodeBase64(Chars) -> Binary +%% +decodeBase64(Cs) -> + list_to_binary(decode1(Cs)). + +decode1([C1, C2, $=, $=]) -> + <<B1, _:16>> = <<(dec(C1)):6, (dec(C2)):6, 0:12>>, + [B1]; +decode1([C1, C2, C3, $=]) -> + <<B1, B2, _:8>> = <<(dec(C1)):6, (dec(C2)):6, (dec(C3)):6, (dec(0)):6>>, + [B1, B2]; +decode1([C1, C2, C3, C4| Cs]) -> + Bin = <<(dec(C1)):6, (dec(C2)):6, (dec(C3)):6, (dec(C4)):6>>, + [Bin| decode1(Cs)]; +decode1([]) -> + []. + +%% enc/1 and dec/1 +%% +%% Mapping: 0-25 -> A-Z, 26-51 -> a-z, 52-61 -> 0-9, 62 -> +, 63 -> / +%% +enc(C) -> + 65 + C + 6*?st(C,26) - 75*?st(C,52) -15*?st(C,62) + 3*?st(C,63). + +dec(C) -> + 62*?st(C,43) + ?st(C,47) + (C-59)*?st(C,48) - 69*?st(C,65) - 6*?st(C,97). + + + +test() -> + start_link("debug"), + collate("a","b",[]). diff --git a/src/couchdb/couch_view.erl b/src/couchdb/couch_view.erl new file mode 100644 index 00000000..612eb5fd --- /dev/null +++ b/src/couchdb/couch_view.erl @@ -0,0 +1,616 @@ +% Copyright 2007, 2008 Damien Katz <damien_katz@yahoo.com> +% +% Licensed under the Apache License, Version 2.0 (the "License"); +% you may not use this file except in compliance with the License. +% You may obtain a copy of the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, +% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +% See the License for the specific language governing permissions and +% limitations under the License. + +-module(couch_view). +-behaviour(gen_server). + +-export([start_link/1,fold/4,fold/5,less_json/2, start_update_loop/3, start_temp_update_loop/4]). +-export([init/1,terminate/2,handle_call/3,handle_cast/2,handle_info/2,code_change/3]). + +-include("couch_db.hrl"). + +% arbitrarily chosen amount of memory to use before flushing to disk +-define(FLUSH_MAX_MEM, 10000000). + +-record(group, + {db, + fd, + name, + def_lang, + views, + id_btree, + current_seq, + query_server=nil + }). + +-record(view, + {id_num, + name, + btree, + def + }). + +-record(server, + {root_dir + }). + +start_link(RootDir) -> + gen_server:start_link({local, couch_view}, couch_view, RootDir, []). + + + +get_temp_updater(DbName, Type, Src) -> + {ok, Pid} = gen_server:call(couch_view, {start_temp_updater, DbName, Type, Src}), + Pid. + +get_updater(DbName, GroupId) -> + {ok, Pid} = gen_server:call(couch_view, {start_updater, DbName, GroupId}), + Pid. + +get_updated_group(Pid) -> + Mref = erlang:monitor(process, Pid), + receive + {'DOWN', Mref, _, _, Reason} -> + throw(Reason) + after 0 -> + Pid ! {self(), get_updated}, + receive + {Pid, Response} -> + erlang:demonitor(Mref), + receive + {'DOWN', Mref, _, _, _} -> + Response + after 0 -> + Response + end; + {'DOWN', Mref, _, _, Reason} -> + throw(Reason) + end + end. + +fold(ViewInfo, Dir, Fun, Acc) -> + fold(ViewInfo, nil, Dir, Fun, Acc). + +fold({temp, DbName, Type, Src}, StartKey, Dir, Fun, Acc) -> + {ok, #group{views=[View]}} = get_updated_group(get_temp_updater(DbName, Type, Src)), + fold_view(View#view.btree, StartKey, Dir, Fun, Acc); +fold({DbName, GroupId, ViewName}, StartKey, Dir, Fun, Acc) -> + {ok, #group{views=Views}} = get_updated_group(get_updater(DbName, GroupId)), + Btree = get_view_btree(Views, ViewName), + fold_view(Btree, StartKey, Dir, Fun, Acc). + +fold_view(Btree, StartKey, Dir, Fun, Acc) -> + TotalRowCount = couch_btree:row_count(Btree), + WrapperFun = fun({{Key, DocId}, Value}, Offset, WrapperAcc) -> + Fun(DocId, Key, Value, Offset, TotalRowCount, WrapperAcc) + end, + {ok, AccResult} = couch_btree:fold(Btree, StartKey, Dir, WrapperFun, Acc), + {ok, TotalRowCount, AccResult}. + + +get_view_btree([], _ViewName) -> + throw({not_found, missing_named_view}); +get_view_btree([View | _RestViews], ViewName) when View#view.name == ViewName -> + View#view.btree; +get_view_btree([_View | RestViews], ViewName) -> + get_view_btree(RestViews, ViewName). + + +init(RootDir) -> + UpdateNotifierFun = + fun({deleted, DbName}) -> + gen_server:cast(couch_view, {reset_indexes, DbName}); + ({created, DbName}) -> + gen_server:cast(couch_view, {reset_indexes, DbName}); + (_Else) -> + ok + end, + couch_db_update_notifier:start_link(UpdateNotifierFun), + ets:new(couch_views_by_db, [bag, private, named_table]), + ets:new(couch_views_by_name, [set, protected, named_table]), + ets:new(couch_views_by_updater, [set, private, named_table]), + ets:new(couch_views_temp_fd_by_db, [set, protected, named_table]), + process_flag(trap_exit, true), + {ok, #server{root_dir=RootDir}}. + +terminate(_Reason, _) -> + catch ets:delete(couch_views_by_name), + catch ets:delete(couch_views_by_updater), + catch ets:delete(couch_views_by_db), + catch ets:delete(couch_views_temp_fd_by_db). + + +handle_call({start_temp_updater, DbName, Lang, Query}, _From, #server{root_dir=Root}=Server) -> + <<SigInt:128/integer>> = erlang:md5(Lang ++ Query), + Name = lists:flatten(io_lib:format("_temp_~.36B",[SigInt])), + Pid = + case ets:lookup(couch_views_by_name, {DbName, Name}) of + [] -> + case ets:lookup(couch_views_temp_fd_by_db, DbName) of + [] -> + FileName = Root ++ "/." ++ DbName ++ "_temp", + {ok, Fd} = couch_file:open(FileName, [create, overwrite]), + Count = 0; + [{_, Fd, Count}] -> + ok + end, + couch_log:debug("Spawning new temp update process for db ~s.", [DbName]), + NewPid = spawn_link(couch_view, start_temp_update_loop, [DbName, Fd, Lang, Query]), + true = ets:insert(couch_views_temp_fd_by_db, {DbName, Fd, Count + 1}), + add_to_ets(NewPid, DbName, Name), + NewPid; + [{_, ExistingPid0}] -> + ExistingPid0 + end, + {reply, {ok, Pid}, Server}; +handle_call({start_updater, DbName, GroupId}, _From, #server{root_dir=Root}=Server) -> + Pid = + case ets:lookup(couch_views_by_name, {DbName, GroupId}) of + [] -> + couch_log:debug("Spawning new update process for view group ~s in database ~s.", [GroupId, DbName]), + NewPid = spawn_link(couch_view, start_update_loop, [Root, DbName, GroupId]), + add_to_ets(NewPid, DbName, GroupId), + NewPid; + [{_, ExistingPid0}] -> + ExistingPid0 + end, + {reply, {ok, Pid}, Server}. + +handle_cast({reset_indexes, DbName}, #server{root_dir=Root}=Server) -> + % shutdown all the updaters + Names = ets:lookup(couch_views_by_db, DbName), + lists:foreach( + fun({_DbName, GroupId}) -> + couch_log:debug("Killing update process for view group ~s. in database ~s.", [GroupId, DbName]), + [{_, Pid}] = ets:lookup(couch_views_by_name, {DbName, GroupId}), + exit(Pid, kill), + receive {'EXIT', Pid, _} -> + delete_from_ets(Pid, DbName, GroupId) + end + end, Names), + delete_index_dir(Root, DbName), + file:delete(Root ++ "/." ++ DbName ++ "_temp"), + {noreply, Server}. + +handle_info({'EXIT', FromPid, Reason}, #server{root_dir=RootDir}=Server) -> + case ets:lookup(couch_views_by_updater, FromPid) of + [] -> % non-updater linked process must have died, we propagate the error + exit(Reason); + [{_, {DbName, "_temp_" ++ _ = GroupId}}] -> + delete_from_ets(FromPid, DbName, GroupId), + [{_, Fd, Count}] = ets:lookup(couch_views_temp_fd_by_db, DbName), + case Count of + 1 -> % Last ref + couch_file:close(Fd), + file:delete(RootDir ++ "/." ++ DbName ++ "_temp"), + true = ets:delete(couch_views_temp_fd_by_db, DbName); + _ -> + true = ets:insert(couch_views_temp_fd_by_db, {DbName, Fd, Count - 1}) + end; + [{_, {DbName, GroupId}}] -> + delete_from_ets(FromPid, DbName, GroupId) + end, + {noreply, Server}. + +add_to_ets(Pid, DbName, GroupId) -> + true = ets:insert(couch_views_by_updater, {Pid, {DbName, GroupId}}), + true = ets:insert(couch_views_by_name, {{DbName, GroupId}, Pid}), + true = ets:insert(couch_views_by_db, {DbName, GroupId}). + +delete_from_ets(Pid, DbName, GroupId) -> + true = ets:delete(couch_views_by_updater, Pid), + true = ets:delete(couch_views_by_name, {DbName, GroupId}), + true = ets:delete_object(couch_views_by_db, {DbName, GroupId}). + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +start_update_loop(RootDir, DbName, GroupId) -> + % wait for a notify request before doing anything. This way, we can just + % exit and any exits will be noticed by the callers. + start_update_loop(RootDir, DbName, GroupId, get_notify_pids(1000)). + + +start_temp_update_loop(DbName, Fd, Lang, Query) -> + NotifyPids = get_notify_pids(1000), + case couch_server:open(DbName) of + {ok, Db} -> + View = #view{name="_temp", id_num=0, btree=nil, def=Query}, + Group = #group{name="_temp", + db=Db, + views=[View], + current_seq=0, + def_lang=Lang, + id_btree=nil}, + Group2 = disk_group_to_mem(Fd, Group), + temp_update_loop(Group2, NotifyPids); + Else -> + exit(Else) + end. + +temp_update_loop(Group, NotifyPids) -> + {ok, Group2} = update_group(Group), + [Pid ! {self(), {ok, Group2}} || Pid <- NotifyPids], + garbage_collect(), + temp_update_loop(Group2, get_notify_pids(100000)). + +start_update_loop(RootDir, DbName, GroupId, NotifyPids) -> + {Db, DefLang, Defs} = + case couch_server:open(DbName) of + {ok, Db0} -> + case couch_db:open_doc(Db0, GroupId) of + {ok, Doc} -> + case couch_doc:get_view_functions(Doc) of + none -> + delete_index_file(RootDir, DbName, GroupId), + exit({not_found, no_views_found}); + {DefLang0, Defs0} -> + {Db0, DefLang0, Defs0} + end; + Else -> + delete_index_file(RootDir, DbName, GroupId), + exit(Else) + end; + Else -> + delete_index_file(RootDir, DbName, GroupId), + exit(Else) + end, + Group = open_index_file(RootDir, DbName, GroupId, DefLang, Defs), + + try update_loop(Group#group{db=Db}, NotifyPids) of + _ -> ok + catch + restart -> + couch_file:close(Group#group.fd), + start_update_loop(RootDir, DbName, GroupId, NotifyPids ++ get_notify_pids()) + end. + +update_loop(#group{fd=Fd}=Group, NotifyPids) -> + {ok, Group2} = update_group(Group), + ok = couch_file:write_header(Fd, <<$r, $c, $k, 0>>, mem_group_to_disk(Group2)), + [Pid ! {self(), {ok, Group2}} || Pid <- NotifyPids], + garbage_collect(), + update_loop(Group2). + +update_loop(Group) -> + update_loop(Group, get_notify_pids()). + +% wait for the first request to come in. +get_notify_pids(Wait) -> + receive + {Pid, get_updated} -> + [Pid | get_notify_pids()] + after Wait -> + exit(wait_timeout) + end. +% then keep getting all available and return. +get_notify_pids() -> + receive + {Pid, get_updated} -> + [Pid | get_notify_pids()] + after 0 -> + [] + end. + +update_group(#group{db=Db,current_seq=CurrentSeq, views=Views}=Group) -> + ViewEmptyKVs = [{View, []} || View <- Views], + % compute on all docs modified since we last computed. + {ok, {UncomputedDocs, Group2, ViewKVsToAdd, DocIdViewIdKeys, NewSeq}} + = couch_db:enum_docs_since( + Db, + CurrentSeq, + fun(DocInfo, _, Acc) -> process_doc(Db, DocInfo, Acc) end, + {[], Group, ViewEmptyKVs, [], CurrentSeq} + ), + + {Group3, Results} = view_compute(Group2, UncomputedDocs), + {ViewKVsToAdd2, DocIdViewIdKeys2} = view_insert_query_results(UncomputedDocs, Results, ViewKVsToAdd, DocIdViewIdKeys), + couch_query_servers:stop_doc_map(Group3#group.query_server), + if CurrentSeq /= NewSeq -> + {ok, Group4} = write_changes(Group3, ViewKVsToAdd2, DocIdViewIdKeys2, NewSeq), + {ok, Group4#group{query_server=nil}}; + true -> + {ok, Group3#group{query_server=nil}} + end. + +delete_index_dir(RootDir, DbName) -> + nuke_dir(RootDir ++ "/." ++ DbName ++ "_design"). + +nuke_dir(Dir) -> + case file:list_dir(Dir) of + {error, enoent} -> ok; % doesn't exist + {ok, Files} -> + lists:foreach( + fun(File)-> + Full = Dir ++ "/" ++ File, + case file:delete(Full) of + ok -> ok; + {error, eperm} -> + ok = nuke_dir(Full) + end + end, + Files), + ok = file:del_dir(Dir) + end. + +delete_index_file(RootDir, DbName, GroupId) -> + file:delete(RootDir ++ "/." ++ DbName ++ GroupId ++ ".view"). + +open_index_file(RootDir, DbName, GroupId, ViewLang, ViewDefs) -> + FileName = RootDir ++ "/." ++ DbName ++ GroupId ++".view", + case couch_file:open(FileName) of + {ok, Fd} -> + case couch_file:read_header(Fd, <<$r, $c, $k, 0>>) of + {ok, #group{views=Views}=Group} -> + % validate all the view definitions in the index are correct. + case same_view_def(Views, ViewDefs) of + true -> disk_group_to_mem(Fd, Group); + false -> reset_header(GroupId, Fd, ViewLang, ViewDefs) + end; + _ -> + reset_header(GroupId, Fd, ViewLang, ViewDefs) + end; + _ -> + case couch_file:open(FileName, [create]) of + {ok, Fd} -> + reset_header(GroupId, Fd, ViewLang, ViewDefs); + Error -> + throw(Error) + end + end. + +same_view_def([], []) -> + true; +same_view_def(DiskViews, ViewDefs) when DiskViews == [] orelse ViewDefs == []-> + false; +same_view_def([#view{name=DiskName,def=DiskDef}|RestViews], [{Name, Def}|RestDefs]) -> + if DiskName == Name andalso DiskDef == Def -> + same_view_def(RestViews, RestDefs); + true -> + false + end. + +% Given a disk ready group structure, return an initialized, in-memory version. +disk_group_to_mem(Fd, #group{id_btree=IdState,views=Views}=Group) -> + {ok, IdBtree} = couch_btree:open(IdState, Fd), + Views2 = lists:map( + fun(#view{btree=BtreeState}=View) -> + {ok, Btree} = couch_btree:open(BtreeState, Fd, [{less, fun less_json/2}]), + View#view{btree=Btree} + end, + Views), + Group#group{fd=Fd, id_btree=IdBtree, views=Views2}. + +% Given an initialized, in-memory group structure, return a disk ready version. +mem_group_to_disk(#group{id_btree=IdBtree,views=Views}=Group) -> + Views2 = lists:map( + fun(#view{btree=Btree}=View) -> + State = couch_btree:get_state(Btree), + View#view{btree=State} + end, + Views), + Group#group{fd=nil, id_btree=couch_btree:get_state(IdBtree), views=Views2}. + +reset_header(GroupId, Fd, DefLanguage, NamedViews) -> + couch_file:truncate(Fd, 0), + {Views, _N} = lists:mapfoldl( + fun({Name, Definiton}, N) -> + {#view{name=Name, id_num=N, btree=nil, def=Definiton}, N+1} + end, + 0, NamedViews), + Group = #group{name=GroupId, + fd=Fd, + views=Views, + current_seq=0, + def_lang=DefLanguage, + id_btree=nil}, + ok = couch_file:write_header(Fd, <<$r, $c, $k, 0>>, Group), + disk_group_to_mem(Fd, Group). + + + +less_json(A, B) -> + TypeA = type_sort(A), + TypeB = type_sort(B), + if + TypeA == TypeB -> + less_same_type(A,B); + true -> + TypeA < TypeB + end. + +type_sort(V) when is_atom(V) -> 0; +type_sort(V) when is_integer(V) -> 1; +type_sort(V) when is_float(V) -> 1; +type_sort(V) when is_list(V) -> 2; +type_sort({obj, _}) -> 4; % must come before tuple test below +type_sort(V) when is_tuple(V) -> 3; +type_sort(V) when is_binary(V) -> 5. + +atom_sort(nil) -> 0; +atom_sort(null) -> 1; +atom_sort(false) -> 2; +atom_sort(true) -> 3. + +less_same_type(A,B) when is_atom(A) -> + atom_sort(A) < atom_sort(B); +less_same_type(A,B) when is_list(A) -> + couch_util:collate(A, B) < 0; +less_same_type({obj, AProps}, {obj, BProps}) -> + less_props(AProps, BProps); +less_same_type(A, B) when is_tuple(A) -> + less_list(tuple_to_list(A),tuple_to_list(B)); +less_same_type(A, B) -> + A < B. + +ensure_list(V) when is_list(V) -> V; +ensure_list(V) when is_atom(V) -> atom_to_list(V). + +less_props([], [_|_]) -> + true; +less_props(_, []) -> + false; +less_props([{AKey, AValue}|RestA], [{BKey, BValue}|RestB]) -> + case couch_util:collate(ensure_list(AKey), ensure_list(BKey)) of + -1 -> true; + 1 -> false; + 0 -> + case less_json(AValue, BValue) of + true -> true; + false -> + case less_json(BValue, AValue) of + true -> false; + false -> + less_props(RestA, RestB) + end + end + end. + +less_list([], [_|_]) -> + true; +less_list(_, []) -> + false; +less_list([A|RestA], [B|RestB]) -> + case less_json(A,B) of + true -> true; + false -> + case less_json(B,A) of + true -> false; + false -> + less_list(RestA, RestB) + end + end. + +process_doc(Db, DocInfo, {Docs, #group{name=GroupId}=Group, ViewKVs, DocIdViewIdKeys, _LastSeq}) -> + % This fun computes once for each document + #doc_info{id=DocId, update_seq=Seq, deleted=Deleted} = DocInfo, + case DocId of + GroupId -> + % uh oh. this is the design doc with our definitions. See if + % anything in the definition changed. + case couch_db:open_doc(Db, DocInfo) of + {ok, Doc} -> + case couch_doc:get_view_functions(Doc) of + none -> + throw(restart); + {DefLang, NewDefs} -> + case Group#group.def_lang == DefLang andalso same_view_def(Group#group.views, NewDefs) of + true -> + % nothing changed, keeping on computing + {ok, {Docs, Group, ViewKVs, DocIdViewIdKeys, Seq}}; + false -> + throw(restart) + end + end; + {not_found, deleted} -> + throw(restart) + end; + ?DESIGN_DOC_PREFIX ++ _ -> % we skip design docs + {ok, {Docs, Group, ViewKVs, DocIdViewIdKeys, Seq}}; + _ -> + {Docs2, DocIdViewIdKeys2} = + if Deleted -> + {Docs, [{DocId, []} | DocIdViewIdKeys]}; + true -> + {ok, Doc} = couch_db:open_doc(Db, DocInfo, [conflicts, deleted_conflicts]), + {[Doc | Docs], DocIdViewIdKeys} + end, + case process_info(self(), memory) of + {memory, Mem} when Mem > ?FLUSH_MAX_MEM -> + {Group1, Results} = view_compute(Group, Docs2), + {ViewKVs3, DocIdViewIdKeys3} = view_insert_query_results(Docs2, Results, ViewKVs, DocIdViewIdKeys2), + {ok, Group2} = write_changes(Group1, ViewKVs3, DocIdViewIdKeys3, Seq), + garbage_collect(), + ViewEmptyKeyValues = [{View, []} || View <- Group2#group.views], + {ok, {[], Group2, ViewEmptyKeyValues, [], Seq}}; + _Else -> + {ok, {Docs2, Group, ViewKVs, DocIdViewIdKeys2, Seq}} + end + end. + +view_insert_query_results([], [], ViewKVs, DocIdViewIdKeysAcc) -> + {ViewKVs, DocIdViewIdKeysAcc}; +view_insert_query_results([Doc|RestDocs], [QueryResults | RestResults], ViewKVs, DocIdViewIdKeysAcc) -> + {NewViewKVs, NewViewIdKeys} = view_insert_doc_query_results(Doc, QueryResults, ViewKVs, [], []), + NewDocIdViewIdKeys = [{Doc#doc.id, NewViewIdKeys} | DocIdViewIdKeysAcc], + view_insert_query_results(RestDocs, RestResults, NewViewKVs, NewDocIdViewIdKeys). + + +view_insert_doc_query_results(_Doc, [], [], ViewKVsAcc, ViewIdKeysAcc) -> + {lists:reverse(ViewKVsAcc), lists:reverse(ViewIdKeysAcc)}; +view_insert_doc_query_results(#doc{id=DocId}=Doc, [ResultKVs|RestResults], [{View, KVs}|RestViewKVs], ViewKVsAcc, ViewIdKeysAcc) -> + NewKVs = [{{Key, DocId}, Value} || {Key, Value} <- ResultKVs], + NewViewIdKeys = [{View#view.id_num, Key} || {Key, _Value} <- ResultKVs], + NewViewKVsAcc = [{View, NewKVs ++ KVs} | ViewKVsAcc], + NewViewIdKeysAcc = NewViewIdKeys ++ ViewIdKeysAcc, + view_insert_doc_query_results(Doc, RestResults, RestViewKVs, NewViewKVsAcc, NewViewIdKeysAcc). + +view_compute(Group, []) -> + {Group, []}; +view_compute(#group{def_lang=DefLang, query_server=QueryServerIn}=Group, Docs) -> + {ok, QueryServer} = + case QueryServerIn of + nil -> % doc map not started + Definitions = [View#view.def || View <- Group#group.views], + couch_query_servers:start_doc_map(DefLang, Definitions); + _ -> + {ok, QueryServerIn} + end, + {ok, Results} = couch_query_servers:map_docs(QueryServer, Docs), + {Group#group{query_server=QueryServer}, Results}. + + +dict_find(Key, DefaultValue, Dict) -> + case dict:find(Key, Dict) of + {ok, Value} -> + Value; + error -> + DefaultValue + end. + +write_changes(Group, ViewKeyValuesToAdd, DocIdViewIdKeys, NewSeq) -> + #group{id_btree=IdBtree} = Group, + + AddDocIdViewIdKeys = [{DocId, ViewIdKeys} || {DocId, ViewIdKeys} <- DocIdViewIdKeys, ViewIdKeys /= []], + RemoveDocIds = [DocId || {DocId, ViewIdKeys} <- DocIdViewIdKeys, ViewIdKeys == []], + LookupDocIds = [DocId || {DocId, _ViewIdKeys} <- DocIdViewIdKeys], + + {ok, LookupResults, IdBtree2} + = couch_btree:query_modify(IdBtree, LookupDocIds, AddDocIdViewIdKeys, RemoveDocIds), + KeysToRemoveByView = lists:foldl( + fun(LookupResult, KeysToRemoveByViewAcc) -> + case LookupResult of + {ok, {DocId, ViewIdKeys}} -> + lists:foldl( + fun({ViewId, Key}, KeysToRemoveByViewAcc2) -> + dict:append(ViewId, {Key, DocId}, KeysToRemoveByViewAcc2) + end, + KeysToRemoveByViewAcc, ViewIdKeys); + {not_found, _} -> + KeysToRemoveByViewAcc + end + end, + dict:new(), LookupResults), + + Views2 = [ + begin + KeysToRemove = dict_find(View#view.id_num, [], KeysToRemoveByView), + {ok, ViewBtree2} = couch_btree:add_remove(View#view.btree, AddKeyValues, KeysToRemove), + View#view{btree = ViewBtree2} + end + || + {View, AddKeyValues} <- ViewKeyValuesToAdd + ], + Group2 = Group#group{views=Views2, current_seq=NewSeq, id_btree=IdBtree2}, + {ok, Group2}. diff --git a/src/couchdb/mod_couch.erl b/src/couchdb/mod_couch.erl new file mode 100644 index 00000000..78c0853a --- /dev/null +++ b/src/couchdb/mod_couch.erl @@ -0,0 +1,891 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-module(mod_couch). + +-include("couch_db.hrl"). + +-export([do/1, load/2, url_decode/1]). + +-include_lib("../couch_inets/httpd.hrl"). + +-record(uri_parts, + {db = "", + doc = "", + attachment = "", + view = "", + querystr = ""}). + +-record(doc_query_args, + { + options = [], + rev = "", + open_revs = "" + }). + +%% do. This is the main entry point into Apache CouchDB from the HTTP server + +do(Mod) -> + #mod{request_uri=Uri,request_line=Request, parsed_header=Header,entity_body=Body} = Mod, + PrevTrapExit = process_flag(trap_exit, true), + Resp = + case Uri of + "/_utils/" ++ RestURI -> + % if the URI is the utils directory, then this + % tells mod_get (a std HTTP module) where to serve the file from + DocumentRoot = httpd_util:lookup(Mod#mod.config_db, document_root, ""), + {Path, AfterPath} = httpd_util:split_path(DocumentRoot ++ "/" ++ RestURI), + + case RestURI of + "" -> + Paths = httpd_util:split_path(DocumentRoot ++ "/index.html"), + {proceed, [{real_name, Paths} | Mod#mod.data]}; + _ -> + case filelib:is_file(Path) of + true -> + {proceed, [{real_name, {Path, AfterPath}} | Mod#mod.data]}; + false -> + case filelib:is_dir(Path) of + true -> + % this ends up causing a "Internal Server Error", need to fix. + {proceed, [{response,{403,"Forbidden"}}]}; + false -> + {proceed, [{response,{404,"Not found"}}]} + end + end + end; + "/favicon.ico" -> + DocumentRoot = httpd_util:lookup(Mod#mod.config_db, document_root, ""), + RealName = DocumentRoot ++ "/" ++ Uri, + {Path, AfterPath} = httpd_util:split_path(RealName), + {proceed, [{real_name, {Path, AfterPath}} | Mod#mod.data]}; + _ -> + couch_log:info("HTTP Request: ~s", [Request]), + couch_log:debug("Headers: ~p", [Header]), + couch_log:debug("Body: ~P", [Body, 100]), + case (catch parse_uri(Uri)) of + {ok, Parts} -> + {ok, ResponseCode} = + case (catch do(Mod, Parts)) of + {ok, ResponseCode0} -> + {ok, ResponseCode0}; + Error -> + send_error(Mod, Error) + end; + Error -> + {ok, ResponseCode} = send_error(Mod, Error) + end, + couch_log:info("HTTP Response Code:~p~n", [ResponseCode]), + {proceed, [{response, {already_sent, ResponseCode, 0}} | Mod#mod.data]} + end, + process_flag(trap_exit, PrevTrapExit), + Resp. + + +parse_uri(RequestUri) -> + % seperate out the path and query portions and + % strip out leading slash and question mark. + case regexp:split(RequestUri, "\\?") of + {ok, [[$/|UriPath], QueryStr]} -> ok; + {ok, [[$/|UriPath]]} -> QueryStr = "" + end, + % lets try to parse out the UriPath. + {ok, UrlParts} = regexp:split(UriPath, "/"), + + {DbName, Id, Attachment, View} = + case UrlParts of + [Db] -> + {Db, "", "", ""}; + [Db, "_design", Doc] -> + {Db, "_design/" ++ Doc, "", ""}; + [Db, "_design", Doc, Attachment0] -> + {Db, "_design/" ++ Doc, Attachment0, ""}; + [Db, "_view", Doc, ViewName] -> + {Db, "_design/" ++ Doc, "", ViewName}; + [Db, "_view%2f" ++ Doc, ViewName] -> + {Db, "_design/" ++ Doc, "", ViewName}; + [Db, Doc] -> + {Db, Doc, "", ""}; + [Db, Doc, Attachment0] -> + {Db, Doc, Attachment0, ""}; + _ -> + throw({invalid_uri, lists:flatten(io_lib:format("Uri has too many parts: ~p", [UrlParts]))}) + end, + {ok, #uri_parts{db=url_decode(DbName), + doc=url_decode(Id), + attachment=url_decode(Attachment), + view=url_decode(View), + querystr=url_decode(QueryStr)}}. + +resp_json_header(Mod) -> + resp_json_header(Mod, []). + +% return json doc header values list +resp_json_header(Mod, Options) -> + Types = string:tokens(proplists:get_value("accept", Mod#mod.parsed_header, ""), ", "), + case lists:member("application/json", Types) of + true -> + resp_header(Mod, Options) ++ [{"content-type","application/json"}]; + false -> + resp_header(Mod, Options) ++ [{"content-type","text/plain;charset=utf-8"}] + end. + +% return doc header values list +resp_header(#mod{http_version=Version}, Options) -> + [{"cache-control", "no-cache"}, + {"pragma", "no-cache"}, + {"expires", httpd_util:rfc1123_date()}] ++ + case lists:member(no_body, Options) of + true -> []; + false -> + case Version == "HTTP/1.1" of + true -> + [{"transfer-encoding", "chunked"}]; + false -> + [{"connection", "close"}] + end + end. + + +url_decode([$%, Hi, Lo | Tail]) -> + Hex = erlang:list_to_integer([Hi, Lo], 16), + xmerl_ucs:to_utf8([Hex]) ++ url_decode(Tail); +url_decode([H|T]) -> + [H |url_decode(T)]; +url_decode([]) -> + []. + + +send_header(Mod, RespCode, Headers) -> + couch_log:debug("HTTP Response Headers (code ~w): ~p", [RespCode, Headers]), + httpd_response:send_header(Mod, RespCode, Headers). + +send_chunk(Mod, Data) -> + httpd_response:send_chunk(Mod, Data, false). + +send_final_chunk(Mod) -> + httpd_response:send_final_chunk(Mod, false). + +show_couch_welcome(Mod) -> + send_header(Mod, 200, resp_json_header(Mod)), + send_chunk(Mod, "{\"couchdb\": \"Welcome\", "), + send_chunk(Mod, "\"version\": \"" ++ couch_server:get_version()), + send_chunk(Mod, "\"}\n"), + send_final_chunk(Mod), + {ok, 200}. + + +do(#mod{method="GET"}=Mod, #uri_parts{db=""}) -> + show_couch_welcome(Mod); +do(#mod{method="GET"}=Mod, #uri_parts{db="_all_dbs", doc=""}=Parts) -> + send_all_dbs(Mod, Parts); +do(#mod{method="POST"}=Mod, #uri_parts{db="_replicate", doc=""}) -> + handle_replication_request(Mod); +do(#mod{method="POST"}=Mod, #uri_parts{db="_restart", doc=""}) -> + couch_server:remote_restart(), + send_ok(Mod, 201); +do(#mod{method="POST"}=Mod, #uri_parts{doc="_missing_revs"}=Parts) -> + handle_missing_revs_request(Mod, Parts); +do(#mod{method="PUT"}=Mod, #uri_parts{doc=""}=Parts) -> + handle_db_create(Mod, Parts); +do(#mod{method="DELETE"}=Mod, #uri_parts{doc=""}=Parts) -> + handle_db_delete(Mod, Parts); +do(#mod{method="POST"}=Mod, #uri_parts{doc="_bulk_docs"}=Parts) -> + handle_bulk_doc_update(Mod, Parts); +do(#mod{method="POST"}=Mod, #uri_parts{doc=""}=Parts) -> + handle_doc_post(Mod, Parts); +do(#mod{method="PUT"}=Mod, Parts) -> + handle_doc_put(Mod, Parts); +do(#mod{method="DELETE"}=Mod, Parts) -> + handle_doc_delete(Mod, Parts); +do(#mod{method="POST"}=Mod, #uri_parts{doc="_temp_view"}=Parts) -> + send_temp_view(Mod, Parts); +do(#mod{method="GET"}=Mod, #uri_parts{doc="_all_docs"}=Parts) -> + send_all_docs(Mod, Parts); +do(#mod{method="GET"}=Mod, #uri_parts{doc="_all_docs_by_seq"}=Parts) -> + send_all_docs_by_seq(Mod, Parts); +do(#mod{method="GET"}=Mod, #uri_parts{doc=""}=Parts) -> + send_database_info(Mod, Parts); +do(#mod{method=Method}=Mod, #uri_parts{attachment="",view=""}=Parts) + when Method == "GET" orelse Method == "HEAD" -> + #doc_query_args{open_revs=Revs} = doc_parse_query(Parts#uri_parts.querystr), + case Revs of + [] -> + send_doc(Mod, Parts); + _ -> + send_doc_revs(Mod, Parts) + end; +do(#mod{method=Method}=Mod, #uri_parts{attachment=Att}=Parts) + when Att /= "", Method == "GET" orelse Method == "HEAD" -> + send_attachment(Mod, Parts); +do(#mod{method="GET"}=Mod, #uri_parts{view=View}=Parts) when View /= "" -> + send_view(Mod, Parts). + +handle_db_create(Mod, #uri_parts{db=DbName}) -> + case couch_server:create(DbName, []) of + {ok, _Db} -> + send_ok(Mod, 201); + {error, database_already_exists} -> + Msg = io_lib:format("Database ~p already exists.", [DbName]), + throw({database_already_exists, Msg}); + Error -> + Msg = io_lib:format("Error creating database ~p: ~p", [DbName, Error]), + throw({unknown_error, Msg}) + end. + +handle_db_delete(Mod, #uri_parts{db=DbName}) -> + % delete with no doc specified, therefore database delete + case couch_server:delete(DbName) of + ok -> + send_ok(Mod, 202); + Error -> + throw(Error) + end. + +handle_bulk_doc_update(#mod{entity_body=RawBody}=Mod, Parts) -> + Options = [], % put options here. + Db = open_db(Parts), + {obj, JsonProps} = cjson:decode(RawBody), + DocsArray = proplists:get_value("docs", JsonProps), + % convert all the doc elements to native docs + case proplists:get_value("new_edits", JsonProps, true) of + true -> + Docs = lists:map( + fun({obj, ObjProps} = JsonObj) -> + Doc = couch_doc:from_json_obj(JsonObj), + + Id = + case Doc#doc.id of + "" -> couch_util:new_uuid(); + Id0 -> Id0 + end, + Revs = + case proplists:get_value("_rev", ObjProps) of + undefined -> []; + Rev -> [Rev] + end, + Doc#doc{id=Id,revs=Revs} + end, + tuple_to_list(DocsArray)), + + {ok, ResultRevs} = couch_db:update_docs(Db, Docs, Options), + + % output the results + DocResults = lists:zipwith( + fun(Doc, NewRev) -> + {obj, [{"id", Doc#doc.id}, {"rev", NewRev}]} + end, + Docs, ResultRevs), + send_ok(Mod, 201, [{new_revs, list_to_tuple(DocResults)}]); + + false -> + Docs = [couch_doc:from_json_obj(JsonObj) || JsonObj <- tuple_to_list(DocsArray)], + ok = couch_db:save_docs(Db, Docs, Options), + send_ok(Mod, 201) + end. + + + + +doc_parse_query(QueryStr) -> + QueryList = httpd:parse_query(QueryStr), + lists:foldl(fun({Key,Value}, Args) -> + case {Key, Value} of + {"attachments", "true"} -> + Options = [attachments | Args#doc_query_args.options], + Args#doc_query_args{options=Options}; + {"meta", "true"} -> + Options = [revs_info, conflicts, deleted_conflicts | Args#doc_query_args.options], + Args#doc_query_args{options=Options}; + {"revs", "true"} -> + Options = [revs | Args#doc_query_args.options], + Args#doc_query_args{options=Options}; + {"revs_info", "true"} -> + Options = [revs_info | Args#doc_query_args.options], + Args#doc_query_args{options=Options}; + {"conflicts", "true"} -> + Options = [conflicts | Args#doc_query_args.options], + Args#doc_query_args{options=Options}; + {"deleted_conflicts", "true"} -> + Options = [deleted_conflicts | Args#doc_query_args.options], + Args#doc_query_args{options=Options}; + {"rev", Rev} -> + Args#doc_query_args{rev=Rev}; + {"open_revs", "all"} -> + Args#doc_query_args{open_revs=all}; + {"open_revs", RevsJsonStr} -> + JsonArray = cjson:decode(RevsJsonStr), + Args#doc_query_args{open_revs=tuple_to_list(JsonArray)}; + _Else -> % unknown key value pair, ignore. + Args + end + end, + #doc_query_args{}, QueryList). + + +handle_doc_post(#mod{entity_body=RawBody}=Mod, Parts) -> + Db = open_db(Parts), + Json = cjson:decode(RawBody), + Doc = couch_doc:from_json_obj(Json), + Id = couch_util:new_uuid(), + {ok, NewRevId} = couch_db:update_doc(Db, Doc#doc{id=Id, revs=[]}, []), + send_ok(Mod, 201, [{"id", Id}, {"rev", NewRevId}], [{"etag", NewRevId}]). + +handle_doc_put(#mod{parsed_header=Headers}=Mod, + #uri_parts{doc=Id, querystr=QueryStr}=Parts) -> + #doc_query_args{options=SaveOptions} = doc_parse_query(QueryStr), + Db = open_db(Parts), + {obj, ObjProps} = Json = cjson:decode(Mod#mod.entity_body), + Doc = couch_doc:from_json_obj(Json), + Etag = proplists:get_value("if-match", Headers, ""), + DocRev = proplists:get_value("_rev", ObjProps, ""), + + if DocRev /= "" andalso Etag /= "" andalso DocRev /= Etag -> + throw({invalid_request, "Document rev and etag have different values"}); + true -> ok + end, + Revs = + if DocRev /= "" -> [DocRev]; + Etag /= "" -> [Etag]; + true -> [] + end, + + {ok, NewRevId} = couch_db:update_doc(Db, Doc#doc{id=Id, revs=Revs}, SaveOptions), + send_ok(Mod, 201, [{"id", Id}, {"rev", NewRevId}],[{"etag", NewRevId}]). + +handle_doc_delete(#mod{parsed_header=Headers}=Mod, + #uri_parts{doc=Id, querystr=QueryStr}=Parts) -> + Db = open_db(Parts), + #doc_query_args{rev=QueryRev} = doc_parse_query(QueryStr), + Etag = proplists:get_value("if-match", Headers, ""), + RevToDelete = + case {QueryRev, Etag} of + {"", ""} -> + throw({missing_rev, "Document rev/etag must be specified to delete"}); + {_, ""} -> + QueryRev; + {"", _} -> + Etag; + _ when QueryRev == Etag -> + Etag; + _ -> + throw({invalid_request, "Document rev and etag have different values"}) + end, + {ok, NewRev} = couch_db:delete_doc(Db, Id, [RevToDelete]), + send_ok(Mod, 202, [{"id", Id}, {"rev", NewRev}]). + + +-record(query_args, + {start_key = nil, + end_key = <<>>, + count = 10000000000, % a huge huge default number. Picked so we don't have + % to do different logic for when there is no count limit + update = true, + direction = fwd, + start_docid = nil, + end_docid = <<>>, + skip = 0 + }). + +reverse_key_default(nil) -> <<>>; +reverse_key_default(<<>>) -> nil; +reverse_key_default(Key) -> Key. + +view_parse_query(QueryStr) -> + QueryList = httpd:parse_query(QueryStr), + lists:foldl(fun({Key,Value}, Args) -> + case {Key, Value} of + {"", _} -> + Args; + {"key", Value} -> + JsonKey = cjson:decode(Value), + Args#query_args{start_key=JsonKey,end_key=JsonKey}; + {"startkey_docid", DocId} -> + Args#query_args{start_docid=DocId}; + {"startkey", Value} -> + Args#query_args{start_key=cjson:decode(Value)}; + {"endkey", Value} -> + Args#query_args{end_key=cjson:decode(Value)}; + {"count", Value} -> + case (catch list_to_integer(Value)) of + Count when is_integer(Count) -> + if Count < 0 -> + Args#query_args { + direction = + if Args#query_args.direction == rev -> fwd; + true -> rev + end, + count=Count, + start_key = reverse_key_default(Args#query_args.start_key), + start_docid = reverse_key_default(Args#query_args.start_docid), + end_key = reverse_key_default(Args#query_args.end_key), + end_docid = reverse_key_default(Args#query_args.end_docid)}; + true -> + Args#query_args{count=Count} + end; + _Error -> + Msg = io_lib:format("Bad URL query value, number expected: count=~s", [Value]), + throw({query_parse_error, Msg}) + end; + {"update", "false"} -> + Args#query_args{update=false}; + {"descending", "true"} -> + case Args#query_args.direction of + fwd -> + Args#query_args { + direction = rev, + start_key = reverse_key_default(Args#query_args.start_key), + start_docid = reverse_key_default(Args#query_args.start_docid), + end_key = reverse_key_default(Args#query_args.end_key), + end_docid = reverse_key_default(Args#query_args.end_docid)}; + _ -> + Args %already reversed + end; + {"skip", Value} -> + case (catch list_to_integer(Value)) of + Count when is_integer(Count) -> + Args#query_args{skip=Count}; + _Error -> + Msg = lists:flatten(io_lib:format( + "Bad URL query value, number expected: skip=~s", [Value])), + throw({query_parse_error, Msg}) + end; + _ -> % unknown key + Msg = lists:flatten(io_lib:format( + "Bad URL query key:~s", [Key])), + throw({query_parse_error, Msg}) + end + end, + #query_args{}, QueryList). + + +% returns db, otherwise throws exception. Note: no {ok,_}. +open_db(#uri_parts{db=DbName}) -> + open_db(DbName); +open_db(DbName) when is_list(DbName)-> + case couch_server:open(DbName) of + {ok, Db} -> + Db; + Error -> + throw(Error) + end. + +handle_missing_revs_request(#mod{entity_body=RawJson}=Mod, Parts) -> + Db = open_db(Parts), + {obj, JsonDocIdRevs} = cjson:decode(RawJson), + DocIdRevs = [{Id, tuple_to_list(Revs)} || {Id, Revs} <- JsonDocIdRevs], + {ok, Results} = couch_db:get_missing_revs(Db, DocIdRevs), + JsonResults = [{Id, list_to_tuple(Revs)} || {Id, Revs} <- Results], + send_json(Mod, 200, {obj, [{missing_revs, {obj, JsonResults}}]}). + +handle_replication_request(#mod{entity_body=RawJson}=Mod) -> + {obj, Props} = cjson:decode(RawJson), + Src = proplists:get_value("source", Props), + Tgt = proplists:get_value("target", Props), + {obj, Options} = proplists:get_value("options", Props, {obj, []}), + {ok, {obj, JsonResults}} = couch_rep:replicate(Src, Tgt, Options), + send_ok(Mod, 200, JsonResults). + + + +send_database_info(Mod, #uri_parts{db=DbName}=Parts) -> + Db = open_db(Parts), + {ok, InfoList} = couch_db:get_db_info(Db), + ok = send_header(Mod, 200, resp_json_header(Mod)), + DocCount = proplists:get_value(doc_count, InfoList), + LastUpdateSequence = proplists:get_value(last_update_seq, InfoList), + ok = send_chunk(Mod, "{\"db_name\": \"" ++ DbName ++ + "\", \"doc_count\":" ++ integer_to_list(DocCount) ++ + ", \"update_seq\":" ++ integer_to_list(LastUpdateSequence)++"}"), + ok = send_final_chunk(Mod), + {ok, 200}. + +send_doc(#mod{parsed_header=Headers}=Mod, + #uri_parts{doc=DocId,querystr=QueryStr}=Parts) -> + Db = open_db(Parts), + #doc_query_args{rev=Rev, options=Options} = doc_parse_query(QueryStr), + case Rev of + "" -> + % open most recent rev + case couch_db:open_doc(Db, DocId, Options) of + {ok, #doc{revs=[DocRev|_]}=Doc} -> + Etag = proplists:get_value("if-none-match", Headers), + if Options == [] andalso Etag == DocRev -> + ok = send_header(Mod, 304, + resp_header(Mod, [no_body]) ++ [{"etag", DocRev}]), + {ok, 304}; + true -> + send_json(Mod, 200, couch_doc:to_json_obj(Doc, Options), + if Options == [] -> [{"etag", DocRev}]; true -> [] end) + end; + Error -> + throw(Error) + end; + _ -> + % open a specific rev (deletions come back as stubs) + case couch_db:open_doc_revs(Db, DocId, [Rev], Options) of + {ok, [{ok, Doc}]} -> + send_json(Mod, 200, couch_doc:to_json_obj(Doc, Options), [{"etag", Rev}]); + {ok, [Else]} -> + throw(Else) + end + end. + +send_doc_revs(Mod, #uri_parts{doc=DocId,querystr=QueryStr}=Parts) -> + Db = open_db(Parts), + #doc_query_args{options=Options, open_revs=Revs} = doc_parse_query(QueryStr), + {ok, Results} = couch_db:open_doc_revs(Db, DocId, Revs, Options), + ok = send_header(Mod, 200, resp_json_header(Mod)), + ok = send_chunk(Mod, "["), + % We loop through the docs. The first time through the separator + % is whitespace, then a comma on subsequent iterations. + lists:foldl( + fun(Result, AccSeparator) -> + case Result of + {ok, Doc} -> + JsonDoc= couch_doc:to_json_obj(Doc, Options), + ok = send_chunk(Mod, AccSeparator ++ lists:flatten(cjson:encode({obj, [{ok, JsonDoc}]}))); + {{not_found, missing}, RevId} -> + Json = {obj, [{"missing", RevId}]}, + ok = send_chunk(Mod, AccSeparator ++ lists:flatten(cjson:encode(Json))) + end, + "," % AccSeparator now has a comma + end, + "", Results), + ok = send_chunk(Mod, "]"), + ok = send_final_chunk(Mod), + {ok, 200}. + +send_attachment(#mod{method=Method} = Mod, + #uri_parts{doc=DocId,attachment=Attachment}=Parts) -> + Db = open_db(Parts), + case couch_db:open_doc(Db, DocId, []) of + {ok, #doc{attachments=Attachments}} -> + case proplists:get_value(Attachment, Attachments) of + undefined -> + throw({not_found, missing}); + {Type, Bin} -> + ok = send_header(Mod, 200, resp_header(Mod, Type) ++ + [{"content-type", Type}, + {"content-length", integer_to_list(couch_doc:bin_size(Bin))}]), + case Method of + "GET" -> + couch_doc:bin_foldl(Bin, + fun(BinSegment, []) -> + ok = send_chunk(Mod, BinSegment), + {ok, []} + end, + []); + "HEAD" -> + ok + end, + ok = send_final_chunk(Mod), + {ok, 200} + end; + Error -> + throw(Error) + end. + + +send_json(Mod, Code, JsonData) -> + send_json(Mod, Code, JsonData, []). + +send_json(#mod{method=Method}=Mod, Code, JsonData, Headers) -> + case Method of + "HEAD" -> + ok = send_header(Mod, Code, resp_json_header(Mod, [no_body]) ++ Headers); + _ -> + ok = send_header(Mod, Code, resp_json_header(Mod) ++ Headers), + ok = send_chunk(Mod, lists:flatten([cjson:encode(JsonData) | "\n"])), + ok = send_final_chunk(Mod) + end, + {ok, Code}. + + +send_ok(Mod, Code) -> + send_ok(Mod, Code, []). + +send_ok(Mod, Code, AdditionalProps) -> + send_ok(Mod, Code, AdditionalProps, []). + +send_ok(Mod, Code, AdditionalProps, AdditionalHeaders) -> + send_json(Mod, Code, {obj, [{ok, true}|AdditionalProps]}, AdditionalHeaders). + + +make_view_fold_fun(Mod, QueryArgs) -> + #query_args{ + end_key=EndKey, + end_docid=EndDocId, + direction=Dir, + count=Count + } = QueryArgs, + + PassedEndFun = + case Dir of + fwd -> + fun(ViewKey, ViewId) -> + couch_view:less_json({EndKey,EndDocId}, {ViewKey,ViewId}) + end; + rev-> + fun(ViewKey, ViewId) -> + couch_view:less_json({ViewKey, ViewId}, {EndKey,EndDocId}) + end + end, + + NegCountFun = + fun(Id, Key, Value, Offset, TotalViewCount, {AccCount, AccSkip, HeaderSent, AccRevRows}) -> + PassedEnd = PassedEndFun(Key, Id), + case {PassedEnd, AccCount, AccSkip, HeaderSent} of + {true,_,_,_} -> + % The stop key has been passed, stop looping. + {stop, {AccCount, AccSkip, HeaderSent, AccRevRows}}; + {_,0,_,_} -> + {stop, {0, 0, HeaderSent, AccRevRows}}; % we've done "count" rows, stop foldling + {_,_,AccSkip,_} when AccSkip > 0 -> + {ok, {AccCount, AccSkip - 1, HeaderSent, AccRevRows}}; + {_,AccCount,_,header_sent} -> + JsonObj = {obj, [{"key",Key},{"id",Id},{"value",Value}]}, + {ok, {AccCount + 1, 0, header_sent, [cjson:encode(JsonObj), "," | AccRevRows]}}; + {_,_,_,header_not_sent} -> + ok = send_header(Mod, 200, resp_json_header(Mod)), + Offset2= TotalViewCount - Offset - + lists:min([TotalViewCount - Offset, - AccCount]), + JsonBegin = io_lib:format("{\"total_rows\":~w,\"offset\":~w,\"rows\":[", + [TotalViewCount, Offset2]), + JsonObj = {obj, [{"key",Key},{"id",Id},{"value",Value}]}, + ok = send_chunk(Mod, lists:flatten(JsonBegin)), + {ok, {AccCount + 1, 0, header_sent, [cjson:encode(JsonObj) | AccRevRows]}} + end + end, + + PosCountFun = + fun(Id, Key, Value, Offset, TotalViewCount, {AccCount, AccSkip, HeaderSent, AccRevRows}) -> + PassedEnd = PassedEndFun(Key, Id), + case {PassedEnd, AccCount, AccSkip, HeaderSent} of + {true,_,_,_} -> + % The stop key has been passed, stop looping. + {stop, {AccCount, AccSkip, HeaderSent, AccRevRows}}; + {_,0,_,_} -> + {stop, {0, 0, HeaderSent, AccRevRows}}; % we've done "count" rows, stop foldling + {_,_,AccSkip,_} when AccSkip > 0 -> + {ok, {AccCount, AccSkip - 1, HeaderSent, AccRevRows}}; + {_,AccCount,_,header_sent} when (AccCount > 0) -> + JsonObj = {obj, [{"key",Key},{"id",Id},{"value",Value}]}, + ok = send_chunk(Mod, "," ++ lists:flatten(cjson:encode(JsonObj))), + {ok, {AccCount - 1, 0, header_sent, AccRevRows}}; + {_,_,_,header_not_sent} -> + ok = send_header(Mod, 200, resp_json_header(Mod)), + JsonBegin = io_lib:format("{\"total_rows\":~w,\"offset\":~w,\"rows\":[", + [TotalViewCount, Offset]), + JsonObj = {obj, [{"key",Key},{"id",Id},{"value",Value}]}, + ok = send_chunk(Mod, lists:flatten(JsonBegin ++ cjson:encode(JsonObj))), + {ok, {AccCount - 1, 0, header_sent, AccRevRows}} + end + end, + case Count > 0 of + true -> PosCountFun; + false -> NegCountFun + end. + +finish_view_fold(Mod, FoldResult) -> + case FoldResult of + {ok, TotalRows, {_, _, header_not_sent, _}} -> + % nothing found in the view, nothing has been returned + % send empty view + ok = send_header(Mod, 200, resp_json_header(Mod)), + JsonEmptyView = lists:flatten( + io_lib:format("{\"total_rows\":~w,\"rows\":[]}\n", + [TotalRows])), + ok = send_chunk(Mod, JsonEmptyView), + ok = send_final_chunk(Mod), + {ok, 200}; + {ok, _TotalRows, {_, _, header_sent, AccRevRows}} -> + % end the view + ok = send_chunk(Mod, lists:flatten(AccRevRows) ++ "]}\n"), + ok = send_final_chunk(Mod), + {ok, 200}; + Error -> + throw(Error) + end. + + +send_temp_view(#mod{entity_body=Body,parsed_header=Headers}=Mod, + #uri_parts{db=DbName, querystr=QueryStr}) -> + #query_args{ + start_key=StartKey, + count=Count, + skip=SkipCount, + direction=Dir, + start_docid=StartDocId} = QueryArgs = view_parse_query(QueryStr), + Type0 = proplists:get_value("content-type", Headers, "text/javascript"), + % remove the charset ("...;charset=foo") if its there + {ok, [Type|_]} = regexp:split(Type0, ";"), + View = {temp, DbName, Type, Body}, + Start = {StartKey, StartDocId}, + FoldlFun = make_view_fold_fun(Mod, QueryArgs), + FoldAccInit = {Count, SkipCount, header_not_sent, []}, + FoldResult = couch_view:fold(View, Start, Dir, FoldlFun, FoldAccInit), + finish_view_fold(Mod, FoldResult). + + +send_view(Mod, #uri_parts{db=DbName, doc=DesignDocId, view=ViewId, querystr=QueryStr}) -> + #query_args{ + start_key=StartKey, + count=Count, + skip=SkipCount, + direction=Dir, + start_docid=StartDocId} = QueryArgs = view_parse_query(QueryStr), + View = {DbName, DesignDocId, ViewId}, + Start = {StartKey, StartDocId}, + FoldlFun = make_view_fold_fun(Mod, QueryArgs), + FoldAccInit = {Count, SkipCount, header_not_sent, []}, + Result = couch_view:fold(View, Start, Dir, FoldlFun, FoldAccInit), + finish_view_fold(Mod, Result). + + +send_all_docs(Mod, #uri_parts{querystr=QueryStr}=Parts) -> + Db = open_db(Parts), + #query_args{ + start_key=StartKey, + start_docid=StartDocId, + count=Count, + skip=SkipCount, + direction=Dir} = QueryArgs = view_parse_query(QueryStr), + {ok, Info} = couch_db:get_db_info(Db), + TotalRowCount = proplists:get_value(doc_count, Info), + + StartId = + if is_list(StartKey) -> StartKey; + true -> StartDocId + end, + + FoldlFun = make_view_fold_fun(Mod, QueryArgs), + AdapterFun = + fun(#full_doc_info{id=Id}=FullDocInfo, Offset, Acc) -> + case couch_doc:to_doc_info(FullDocInfo) of + #doc_info{deleted=false, rev=Rev} -> + FoldlFun(Id, Id, {obj, [{rev, Rev}]}, Offset, TotalRowCount, Acc); + #doc_info{deleted=true} -> + {ok, Acc} + end + end, + {ok, FoldResult} = couch_db:enum_docs(Db, StartId, Dir, AdapterFun, + {Count, SkipCount, header_not_sent, []}), + finish_view_fold(Mod, {ok, TotalRowCount, FoldResult}). + +send_all_docs_by_seq(Mod, #uri_parts{querystr=QueryStr}=Parts) -> + Db = open_db(Parts), + QueryArgs = view_parse_query(QueryStr), + #query_args{ + start_key=StartKey, + count=Count, + skip=SkipCount, + direction=Dir} = QueryArgs, + + {ok, Info} = couch_db:get_db_info(Db), + TotalRowCount = proplists:get_value(doc_count, Info), + + FoldlFun = make_view_fold_fun(Mod, QueryArgs), + + StartKey2 = + case StartKey of + nil -> 0; + <<>> -> 100000000000; + StartKey when is_integer(StartKey) -> StartKey + end, + {ok, FoldResult} = + couch_db:enum_docs_since(Db, StartKey2, Dir, + fun(DocInfo, Offset, Acc) -> + #doc_info{ + id=Id, + rev=Rev, + update_seq=UpdateSeq, + deleted=Deleted, + conflict_revs=ConflictRevs, + deleted_conflict_revs=DelConflictRevs} = DocInfo, + Json = + {obj, + [{"rev", Rev}] ++ + case ConflictRevs of + [] -> []; + _ -> [{"conflicts", list_to_tuple(ConflictRevs)}] + end ++ + case DelConflictRevs of + [] -> []; + _ -> [{"deleted_conflicts", list_to_tuple(DelConflictRevs)}] + end ++ + case Deleted of + true -> [{"deleted", true}]; + false -> [] + end + }, + FoldlFun(Id, UpdateSeq, Json, Offset, TotalRowCount, Acc) + end, {Count, SkipCount, header_not_sent, []}), + finish_view_fold(Mod, {ok, TotalRowCount, FoldResult}). + + + +send_all_dbs(Mod, _Parts)-> + {ok, DbNames} = couch_server:all_databases(), + ok = send_header(Mod, 200, resp_json_header(Mod)), + ok = send_chunk(Mod, lists:flatten(cjson:encode(list_to_tuple(DbNames)))), + ok = send_final_chunk(Mod), + {ok, 200}. + +send_error(Mod, Error) -> + {Json, Code} = error_to_json(Error), + couch_log:info("HTTP Error (code ~w): ~p", [Code, Json]), + send_json(Mod, Code, Json). + + + +% convert an error response into a json object and http error code. +error_to_json(Error) -> + {HttpCode, Atom, Reason} = error_to_json0(Error), + Reason1 = + case (catch io_lib:format("~s", [Reason])) of + Reason0 when is_list(Reason0) -> + lists:flatten(Reason0); + _ -> + lists:flatten(io_lib:format("~p", [Reason])) % else term to text + end, + Json = + {obj, + [{error, atom_to_list(Atom)}, + {reason, Reason1}]}, + {Json, HttpCode}. + +error_to_json0(not_found) -> + {404, not_found, "missing"}; +error_to_json0({missing_rev, Msg}) -> + {412, missing_rev, Msg}; +error_to_json0({not_found, Reason}) -> + {404, not_found, Reason}; +error_to_json0({database_already_exists, Reason}) -> + {409, database_already_exists, Reason}; +error_to_json0(conflict) -> + {412, conflict, "Update conflict"}; +error_to_json0({doc_validation, Msg}) -> + {406, doc_validation, Msg}; +error_to_json0({Id, Reason}) when is_atom(Id) -> + {500, Id, Reason}; +error_to_json0(Error) -> + {500, error, Error}. + +%% +%% Configuration +%% + +%% load + +load("Foo Bar", []) -> + {ok, [], {script_alias, {"foo", "bar"}}}. |