diff options
author | Robert Newson <robert.newson@cloudant.com> | 2011-09-28 11:18:06 +0100 |
---|---|---|
committer | Robert Newson <robert.newson@cloudant.com> | 2011-09-28 11:32:50 +0100 |
commit | 954ddf0fca558f17f39e68df8311ee9057beb390 (patch) | |
tree | 2542c112210363076be1a8bc7b24e13bfaf1c055 /apps/couch | |
parent | c8d7b6d8c3cb881d525be80bc6b16bc08822df65 (diff) | |
parent | befbdfb11f45bd2a5ccffb6b0d5ac04435ac9e55 (diff) |
Merge 1.1.x changes
Conflicts:
apps/couch/include/couch_db.hrl
apps/couch/src/couch_db.erl
apps/couch/src/couch_os_process.erl
apps/couch/src/couch_query_servers.erl
apps/couch/src/couch_rep.erl
apps/couch/src/couch_replication_manager.erl
apps/couch/src/couch_view_compactor.erl
apps/couch/src/couch_view_group.erl
apps/couch/src/couch_view_updater.erl
configure.ac
couchjs/c_src/http.c
couchjs/c_src/main.c
couchjs/c_src/utf8.c
etc/windows/couchdb.iss.tpl
src/couchdb/priv/Makefile.am
src/couchdb/priv/couch_js/main.c
test/etap/160-vhosts.t
test/etap/200-view-group-no-db-leaks.t
test/etap/Makefile.am
BugzID: 12645
Diffstat (limited to 'apps/couch')
30 files changed, 855 insertions, 269 deletions
diff --git a/apps/couch/CHANGES b/apps/couch/CHANGES index 54a2e03c..64b4f3c1 100644 --- a/apps/couch/CHANGES +++ b/apps/couch/CHANGES @@ -6,6 +6,9 @@ Version 1.1.1 This version has not been released yet. +* ETags for views include current sequence if include_docs=true. +* JSONP responses now send "text/javascript" for Content-Type. + Version 1.1.0 ------------- diff --git a/apps/couch/NEWS b/apps/couch/NEWS index 97eb58e7..d7dc7cf1 100644 --- a/apps/couch/NEWS +++ b/apps/couch/NEWS @@ -12,6 +12,9 @@ Version 1.1.1 This version has not been released yet. +* ETags for views include current sequence if include_docs=true. +* JSONP responses now send "text/javascript" for Content-Type. + Version 1.1.0 ------------- diff --git a/apps/couch/THANKS b/apps/couch/THANKS index aae7991c..76a0c19b 100644 --- a/apps/couch/THANKS +++ b/apps/couch/THANKS @@ -80,6 +80,7 @@ suggesting improvements or submitting changes. Some of these people are: * Sam Bisbee <sam@sbisbee.com> * Nathan Vander Wilt <natevw@yahoo.com> * Caolan McMahon <caolan.mcmahon@googlemail.com> - + * Alexander Shorin <kxepal@gmail.com> + * Christopher Bonhage <queezey@me.com> For a list of authors see the `AUTHORS` file. diff --git a/apps/couch/include/couch_db.hrl b/apps/couch/include/couch_db.hrl index a96d5d4f..1c425e8d 100644 --- a/apps/couch/include/couch_db.hrl +++ b/apps/couch/include/couch_db.hrl @@ -25,8 +25,20 @@ -define(DEFAULT_ATTACHMENT_CONTENT_TYPE, <<"application/octet-stream">>). --define(LOG_DEBUG(Format, Args), couch_log:debug(Format, Args)). --define(LOG_INFO(Format, Args), couch_log:info(Format, Args)). +-define(LOG_DEBUG(Format, Args), + case couch_log:debug_on() of + true -> + couch_log:debug(Format, Args); + false -> ok + end). + +-define(LOG_INFO(Format, Args), + case couch_log:info_on() of + true -> + couch_log:info(Format, Args); + false -> ok + end). + -define(LOG_ERROR(Format, Args), couch_log:error(Format, Args)). -record(rev_info, @@ -210,7 +222,6 @@ -record(group, { sig=nil, - dbname, fd=nil, name, def_lang, diff --git a/apps/couch/src/couch_btree.erl b/apps/couch/src/couch_btree.erl index 3f2e86d8..52fcaece 100644 --- a/apps/couch/src/couch_btree.erl +++ b/apps/couch/src/couch_btree.erl @@ -15,6 +15,7 @@ -export([open/2, open/3, query_modify/4, add/2, add_remove/3]). -export([fold/4, full_reduce/1, final_reduce/2, foldl/3, foldl/4]). -export([fold_reduce/4, lookup/2, get_state/1, set_options/2]). +-export([less/3]). -record(btree, {fd, @@ -109,9 +110,17 @@ full_reduce(#btree{root={_P, Red}, reduce=Reduce}) -> % wraps a 2 arity function with the proper 3 arity function convert_fun_arity(Fun) when is_function(Fun, 2) -> - fun(KV, _Reds, AccIn) -> Fun(KV, AccIn) end; + fun + (visit, KV, _Reds, AccIn) -> Fun(KV, AccIn); + (traverse, _K, _Red, AccIn) -> {ok, AccIn} + end; convert_fun_arity(Fun) when is_function(Fun, 3) -> - Fun. % Already arity 3 + fun + (visit, KV, Reds, AccIn) -> Fun(KV, Reds, AccIn); + (traverse, _K, _Red, AccIn) -> {ok, AccIn} + end; +convert_fun_arity(Fun) when is_function(Fun, 4) -> + Fun. % Already arity 4 make_key_in_end_range_function(Bt, fwd, Options) -> case couch_util:get_value(end_key_gt, Options) of @@ -614,12 +623,17 @@ stream_node(Bt, Reds, {Pointer, _Reds}, InRange, Dir, Fun, Acc) -> stream_kp_node(_Bt, _Reds, [], _InRange, _Dir, _Fun, Acc) -> {ok, Acc}; -stream_kp_node(Bt, Reds, [{_Key, {Pointer, Red}} | Rest], InRange, Dir, Fun, Acc) -> - case stream_node(Bt, Reds, {Pointer, Red}, InRange, Dir, Fun, Acc) of +stream_kp_node(Bt, Reds, [{Key, {Pointer, Red}} | Rest], InRange, Dir, Fun, Acc) -> + case Fun(traverse, Key, Red, Acc) of {ok, Acc2} -> - stream_kp_node(Bt, [Red | Reds], Rest, InRange, Dir, Fun, Acc2); - {stop, LastReds, Acc2} -> - {stop, LastReds, Acc2} + case stream_node(Bt, Reds, {Pointer, Red}, InRange, Dir, Fun, Acc2) of + {ok, Acc3} -> + stream_kp_node(Bt, [Red | Reds], Rest, InRange, Dir, Fun, Acc3); + {stop, LastReds, Acc3} -> + {stop, LastReds, Acc3} + end; + {skip, Acc2} -> + stream_kp_node(Bt, [Red | Reds], Rest, InRange, Dir, Fun, Acc2) end. drop_nodes(_Bt, Reds, _StartKey, []) -> @@ -680,7 +694,7 @@ stream_kv_node2(Bt, Reds, PrevKVs, [{K,V} | RestKVs], InRange, Dir, Fun, Acc) -> {stop, {PrevKVs, Reds}, Acc}; true -> AssembledKV = assemble(Bt, K, V), - case Fun(AssembledKV, {PrevKVs, Reds}, Acc) of + case Fun(visit, AssembledKV, {PrevKVs, Reds}, Acc) of {ok, Acc2} -> stream_kv_node2(Bt, Reds, [AssembledKV | PrevKVs], RestKVs, InRange, Dir, Fun, Acc2); {stop, Acc2} -> diff --git a/apps/couch/src/couch_changes.erl b/apps/couch/src/couch_changes.erl index 44d0ad46..4f2857b6 100644 --- a/apps/couch/src/couch_changes.erl +++ b/apps/couch/src/couch_changes.erl @@ -268,7 +268,6 @@ start_sending_changes(Callback, UserAcc, ResponseType) -> send_changes(Args, Callback, UserAcc, Db, StartSeq, Prepend) -> #changes_args{ - style = Style, include_docs = IncludeDocs, conflicts = Conflicts, limit = Limit, @@ -278,7 +277,6 @@ send_changes(Args, Callback, UserAcc, Db, StartSeq, Prepend) -> } = Args, couch_db:changes_since( Db, - Style, StartSeq, fun changes_enumerator/2, [{dir, Dir}], diff --git a/apps/couch/src/couch_db.erl b/apps/couch/src/couch_db.erl index 96c49886..c01b0a35 100644 --- a/apps/couch/src/couch_db.erl +++ b/apps/couch/src/couch_db.erl @@ -23,7 +23,7 @@ -export([increment_update_seq/1,get_purge_seq/1,purge_docs/2,get_last_purged/1]). -export([start_link/3,open_doc_int/3,ensure_full_commit/1,ensure_full_commit/2]). -export([set_security/2,get_security/1]). --export([changes_since/5,changes_since/6,read_doc/2,new_revid/1]). +-export([changes_since/4,changes_since/5,read_doc/2,new_revid/1]). -export([check_is_admin/1, check_is_reader/1, get_doc_count/1]). -export([reopen/1, make_doc/5]). @@ -293,8 +293,11 @@ get_design_docs(#db{name = <<"shards/", _/binary>> = ShardName}) -> Response end; get_design_docs(#db{id_tree=Btree}=Db) -> - {ok,_, Docs} = couch_btree:fold(Btree, - fun(#full_doc_info{id= <<"_design/",_/binary>>}=FullDocInfo, _Reds, AccDocs) -> + {ok, _, Docs} = couch_view:fold( + #view{btree=Btree}, + fun(#full_doc_info{deleted = true}, _Reds, AccDocs) -> + {ok, AccDocs}; + (#full_doc_info{id= <<"_design/",_/binary>>}=FullDocInfo, _Reds, AccDocs) -> {ok, Doc} = couch_db:open_doc_int(Db, FullDocInfo, []), {ok, [Doc | AccDocs]}; (_, _Reds, AccDocs) -> @@ -987,10 +990,10 @@ enum_docs_reduce_to_count(Reds) -> fun couch_db_updater:btree_by_id_reduce/2, Reds), Count. -changes_since(Db, Style, StartSeq, Fun, Acc) -> - changes_since(Db, Style, StartSeq, Fun, [], Acc). - -changes_since(Db, Style, StartSeq, Fun, Options, Acc) -> +changes_since(Db, StartSeq, Fun, Acc) -> + changes_since(Db, StartSeq, Fun, [], Acc). + +changes_since(Db, StartSeq, Fun, Options, Acc) -> Wrapper = fun(FullDocInfo, _Offset, Acc2) -> case FullDocInfo of #full_doc_info{} -> @@ -998,17 +1001,7 @@ changes_since(Db, Style, StartSeq, Fun, Options, Acc) -> #doc_info{} -> DocInfo = FullDocInfo end, - #doc_info{revs=Revs} = DocInfo, - DocInfo2 = - case Style of - main_only -> - DocInfo; - all_docs -> - % remove revs before the seq - DocInfo#doc_info{revs=[RevInfo || - #rev_info{seq=RevSeq}=RevInfo <- Revs, StartSeq < RevSeq]} - end, - Fun(DocInfo2, Acc2) + Fun(DocInfo, Acc2) end, {ok, _LastReduction, AccOut} = couch_btree:fold(Db#db.seq_tree, Wrapper, Acc, [{start_key, couch_util:to_integer(StartSeq) + 1} | Options]), @@ -1028,7 +1021,8 @@ enum_docs_since(Db, SinceSeq, InFun, Acc, Options) -> {ok, enum_docs_since_reduce_to_count(LastReduction), AccOut}. enum_docs(Db, InFun, InAcc, Options) -> - {ok, LastReduce, OutAcc} = couch_btree:fold(Db#db.id_tree, InFun, InAcc, Options), + {ok, LastReduce, OutAcc} = couch_view:fold( + #view{btree=Db#db.id_tree}, InFun, InAcc, Options), {ok, enum_docs_reduce_to_count(LastReduce), OutAcc}. %%% Internal function %%% diff --git a/apps/couch/src/couch_doc.erl b/apps/couch/src/couch_doc.erl index 33d7e3cf..827015db 100644 --- a/apps/couch/src/couch_doc.erl +++ b/apps/couch/src/couch_doc.erl @@ -302,10 +302,18 @@ to_doc_info(FullDocInfo) -> {DocInfo, _Path} = to_doc_info_path(FullDocInfo), DocInfo. -max_seq([], Max) -> - Max; -max_seq([#rev_info{seq=Seq}|Rest], Max) -> - max_seq(Rest, if Max > Seq -> Max; true -> Seq end). +max_seq(Tree) -> + FoldFun = fun({_Pos, _Key}, Value, _Type, MaxOldSeq) -> + case Value of + {_Deleted, _DiskPos, OldTreeSeq} -> + erlang:max(MaxOldSeq, OldTreeSeq); + #leaf{seq=LeafSeq} -> + erlang:max(MaxOldSeq, LeafSeq); + _ -> + MaxOldSeq + end + end, + couch_key_tree:fold(FoldFun, 0, Tree). to_doc_info_path(#full_doc_info{id=Id,rev_tree=Tree}) -> RevInfosAndPath = @@ -320,7 +328,7 @@ to_doc_info_path(#full_doc_info{id=Id,rev_tree=Tree}) -> end, RevInfosAndPath), [{_RevInfo, WinPath}|_] = SortedRevInfosAndPath, RevInfos = [RevInfo || {RevInfo, _Path} <- SortedRevInfosAndPath], - {#doc_info{id=Id, high_seq=max_seq(RevInfos, 0), revs=RevInfos}, WinPath}. + {#doc_info{id=Id, high_seq=max_seq(Tree), revs=RevInfos}, WinPath}. diff --git a/apps/couch/src/couch_httpd.erl b/apps/couch/src/couch_httpd.erl index 8fb2687c..602bdf2b 100644 --- a/apps/couch/src/couch_httpd.erl +++ b/apps/couch/src/couch_httpd.erl @@ -469,16 +469,24 @@ body_length(Req) -> Unknown -> {unknown_transfer_encoding, Unknown} end. -body(#httpd{mochi_req=MochiReq, req_body=ReqBody}) -> - case ReqBody of +body(#httpd{mochi_req=MochiReq, req_body=undefined} = Req) -> + case body_length(Req) of undefined -> - % Maximum size of document PUT request body (4GB) MaxSize = list_to_integer( couch_config:get("couchdb", "max_document_size", "4294967296")), MochiReq:recv_body(MaxSize); - _Else -> - ReqBody - end. + chunked -> + ChunkFun = fun({0, _Footers}, Acc) -> + lists:reverse(Acc); + ({_Len, Chunk}, Acc) -> + [Chunk | Acc] + end, + recv_chunked(Req, 8192, ChunkFun, []); + Len -> + MochiReq:recv_body(Len) + end; +body(#httpd{req_body=ReqBody}) -> + ReqBody. json_body(Httpd) -> ?JSON_DECODE(body(Httpd)). @@ -619,25 +627,25 @@ send_json(Req, Code, Value) -> send_json(Req, Code, [], Value). send_json(Req, Code, Headers, Value) -> + initialize_jsonp(Req), DefaultHeaders = [ {"Content-Type", negotiate_content_type(Req)}, {"Cache-Control", "must-revalidate"} ], - Body = [start_jsonp(Req), ?JSON_ENCODE(Value), end_jsonp(), $\n], + Body = [start_jsonp(), ?JSON_ENCODE(Value), end_jsonp(), $\n], send_response(Req, Code, DefaultHeaders ++ Headers, Body). start_json_response(Req, Code) -> start_json_response(Req, Code, []). start_json_response(Req, Code, Headers) -> + initialize_jsonp(Req), DefaultHeaders = [ {"Content-Type", negotiate_content_type(Req)}, {"Cache-Control", "must-revalidate"} ], - start_jsonp(Req), % Validate before starting chunked. - %start_chunked_response(Req, Code, DefaultHeaders ++ Headers). {ok, Resp} = start_chunked_response(Req, Code, DefaultHeaders ++ Headers), - case start_jsonp(Req) of + case start_jsonp() of [] -> ok; Start -> send_chunk(Resp, Start) end, @@ -647,7 +655,7 @@ end_json_response(Resp) -> send_chunk(Resp, end_jsonp() ++ [$\n]), last_chunk(Resp). -start_jsonp(Req) -> +initialize_jsonp(Req) -> case get(jsonp) of undefined -> put(jsonp, qs_value(Req, "callback", no_jsonp)); _ -> ok @@ -660,14 +668,9 @@ start_jsonp(Req) -> % make sure jsonp is configured on (default off) case couch_config:get("httpd", "allow_jsonp", "false") of "true" -> - validate_callback(CallBack), - CallBack ++ "("; + validate_callback(CallBack); _Else -> - % this could throw an error message, but instead we just ignore the - % jsonp parameter - % throw({bad_request, <<"JSONP must be configured before using.">>}) - put(jsonp, no_jsonp), - [] + put(jsonp, no_jsonp) end catch Error -> @@ -676,6 +679,13 @@ start_jsonp(Req) -> end end. +start_jsonp() -> + case get(jsonp) of + no_jsonp -> []; + [] -> []; + CallBack -> CallBack ++ "(" + end. + end_jsonp() -> Resp = case get(jsonp) of no_jsonp -> []; @@ -836,7 +846,14 @@ send_redirect(Req, Path) -> Headers = [{"Location", couch_httpd:absolute_uri(Req, Path)}], send_response(Req, 301, Headers, <<>>). -negotiate_content_type(#httpd{mochi_req=MochiReq}) -> +negotiate_content_type(Req) -> + case get(jsonp) of + no_jsonp -> negotiate_content_type1(Req); + [] -> negotiate_content_type1(Req); + _Callback -> "text/javascript" + end. + +negotiate_content_type1(#httpd{mochi_req=MochiReq}) -> %% Determine the appropriate Content-Type header for a JSON response %% depending on the Accept header in the request. A request that explicitly %% lists the correct JSON MIME type will get that type, otherwise the diff --git a/apps/couch/src/couch_httpd_db.erl b/apps/couch/src/couch_httpd_db.erl index 71204598..0bf97e26 100644 --- a/apps/couch/src/couch_httpd_db.erl +++ b/apps/couch/src/couch_httpd_db.erl @@ -128,7 +128,7 @@ handle_changes_req1(Req, Db) -> handle_compact_req(#httpd{method='POST',path_parts=[DbName,_,Id|_]}=Req, Db) -> ok = couch_db:check_is_admin(Db), couch_httpd:validate_ctype(Req, "application/json"), - ok = couch_view_compactor:start_compact(DbName, Id), + {ok, _} = couch_view_compactor:start_compact(DbName, Id), send_json(Req, 202, {[{ok, true}]}); handle_compact_req(#httpd{method='POST'}=Req, Db) -> @@ -477,6 +477,7 @@ db_req(#httpd{path_parts=[_, DocId | FileNameParts]}=Req, Db) -> db_attachment_req(Req, Db, DocId, FileNameParts). all_docs_view(Req, Db, Keys) -> + RawCollator = fun(A, B) -> A < B end, #view_query_args{ start_key = StartKey, start_docid = StartDocId, @@ -486,7 +487,8 @@ all_docs_view(Req, Db, Keys) -> skip = SkipCount, direction = Dir, inclusive_end = Inclusive - } = QueryArgs = couch_httpd_view:parse_view_params(Req, Keys, map), + } = QueryArgs + = couch_httpd_view:parse_view_params(Req, Keys, map, RawCollator), {ok, Info} = couch_db:get_db_info(Db), CurrentEtag = couch_httpd:make_etag(Info), couch_httpd:etag_respond(Req, CurrentEtag, fun() -> diff --git a/apps/couch/src/couch_httpd_show.erl b/apps/couch/src/couch_httpd_show.erl index 59f74e1c..742b0f20 100644 --- a/apps/couch/src/couch_httpd_show.erl +++ b/apps/couch/src/couch_httpd_show.erl @@ -106,13 +106,15 @@ get_fun_key(DDoc, Type, Name) -> % send_method_not_allowed(Req, "POST,PUT,DELETE,ETC"); handle_doc_update_req(#httpd{ - path_parts=[_, _, _, _, UpdateName, DocId] + path_parts=[_, _, _, _, UpdateName, DocId|Rest] }=Req, Db, DDoc) -> - Doc = try couch_httpd_db:couch_doc_open(Db, DocId, nil, [conflicts]) + DocParts = [DocId|Rest], + DocId1 = ?l2b(string:join([?b2l(P)|| P <- DocParts], "/")), + Doc = try couch_httpd_db:couch_doc_open(Db, DocId1, nil, [conflicts]) catch _ -> nil end, - send_doc_update_response(Req, Db, DDoc, UpdateName, Doc, DocId); + send_doc_update_response(Req, Db, DDoc, UpdateName, Doc, DocId1); handle_doc_update_req(#httpd{ path_parts=[_, _, _, _, UpdateName] @@ -190,14 +192,14 @@ handle_view_list_req(Req, _Db, _DDoc) -> handle_view_list(Req, Db, DDoc, LName, {ViewDesignName, ViewName}, Keys) -> ViewDesignId = <<"_design/", ViewDesignName/binary>>, {ViewType, View, Group, QueryArgs} = couch_httpd_view:load_view(Req, Db, {ViewDesignId, ViewName}, Keys), - Etag = list_etag(Req, Db, Group, View, {couch_httpd:doc_etag(DDoc), Keys}), + Etag = list_etag(Req, Db, Group, View, QueryArgs, {couch_httpd:doc_etag(DDoc), Keys}), couch_httpd:etag_respond(Req, Etag, fun() -> output_list(ViewType, Req, Db, DDoc, LName, View, QueryArgs, Etag, Keys, Group) end). -list_etag(#httpd{user_ctx=UserCtx}=Req, Db, Group, View, More) -> +list_etag(#httpd{user_ctx=UserCtx}=Req, Db, Group, View, QueryArgs, More) -> Accept = couch_httpd:header_value(Req, "Accept"), - couch_httpd_view:view_etag(Db, Group, View, {More, Accept, UserCtx#user_ctx.roles}). + couch_httpd_view:view_etag(Db, Group, View, QueryArgs, {More, Accept, UserCtx#user_ctx.roles}). output_list(map, Req, Db, DDoc, LName, View, QueryArgs, Etag, Keys, Group) -> output_map_list(Req, Db, DDoc, LName, View, QueryArgs, Etag, Keys, Group); diff --git a/apps/couch/src/couch_httpd_vhost.erl b/apps/couch/src/couch_httpd_vhost.erl index 9bfb5951..03dd02ae 100644 --- a/apps/couch/src/couch_httpd_vhost.erl +++ b/apps/couch/src/couch_httpd_vhost.erl @@ -216,15 +216,19 @@ code_change(_OldVsn, State, _Extra) -> {ok, State}. +append_path("/"=_Target, "/"=_Path) -> + "/"; +append_path(Target, Path) -> + Target ++ Path. % default redirect vhost handler redirect_to_vhost(MochiReq, VhostTarget) -> Path = MochiReq:get(raw_path), - Target = VhostTarget ++ Path, + Target = append_path(VhostTarget, Path), ?LOG_DEBUG("Vhost Target: '~p'~n", [Target]), - + Headers = mochiweb_headers:enter("x-couchdb-vhost-path", Path, MochiReq:get(headers)), @@ -356,8 +360,8 @@ split_host_port(HostAsString) -> {split_host(HostAsString), '*'}; N -> HostPart = string:substr(HostAsString, 1, N-1), - case (catch erlang:list_to_integer(HostAsString, N+1, - length(HostAsString))) of + case (catch erlang:list_to_integer(string:substr(HostAsString, + N+1, length(HostAsString)))) of {'EXIT', _} -> {split_host(HostAsString), '*'}; Port -> diff --git a/apps/couch/src/couch_httpd_view.erl b/apps/couch/src/couch_httpd_view.erl index b71fc2c6..082a5039 100644 --- a/apps/couch/src/couch_httpd_view.erl +++ b/apps/couch/src/couch_httpd_view.erl @@ -15,9 +15,9 @@ -export([handle_view_req/3,handle_temp_view_req/2]). --export([parse_view_params/3]). +-export([parse_view_params/4]). -export([make_view_fold_fun/7, finish_view_fold/4, finish_view_fold/5, view_row_obj/4]). --export([view_etag/3, view_etag/4, make_reduce_fold_funs/6]). +-export([view_etag/5, make_reduce_fold_funs/6]). -export([design_doc_view/5, parse_bool_param/1, doc_member/3]). -export([make_key_options/1, load_view/4]). @@ -34,18 +34,19 @@ design_doc_view(Req, Db, DName, ViewName, Keys) -> Reduce = get_reduce_type(Req), Result = case couch_view:get_map_view(Db, DesignId, ViewName, Stale) of {ok, View, Group} -> - QueryArgs = parse_view_params(Req, Keys, map), + QueryArgs = parse_view_params(Req, Keys, map, view_collator(View)), output_map_view(Req, View, Group, Db, QueryArgs, Keys); {not_found, Reason} -> case couch_view:get_reduce_view(Db, DesignId, ViewName, Stale) of {ok, ReduceView, Group} -> + Collator = view_collator(ReduceView), case Reduce of false -> - QueryArgs = parse_view_params(Req, Keys, red_map), + QueryArgs = parse_view_params(Req, Keys, red_map, Collator), MapView = couch_view:extract_map_view(ReduceView), output_map_view(Req, MapView, Group, Db, QueryArgs, Keys); _ -> - QueryArgs = parse_view_params(Req, Keys, reduce), + QueryArgs = parse_view_params(Req, Keys, reduce, Collator), output_reduce_view(Req, Db, ReduceView, Group, QueryArgs, Keys) end; _ -> @@ -90,19 +91,19 @@ handle_temp_view_req(#httpd{method='POST'}=Req, Db) -> Reduce = get_reduce_type(Req), case couch_util:get_value(<<"reduce">>, Props, null) of null -> - QueryArgs = parse_view_params(Req, Keys, map), {ok, View, Group} = couch_view:get_temp_map_view(Db, Language, DesignOptions, MapSrc), + QueryArgs = parse_view_params(Req, Keys, map, view_collator(View)), output_map_view(Req, View, Group, Db, QueryArgs, Keys); _ when Reduce =:= false -> - QueryArgs = parse_view_params(Req, Keys, red_map), {ok, View, Group} = couch_view:get_temp_map_view(Db, Language, DesignOptions, MapSrc), + QueryArgs = parse_view_params(Req, Keys, red_map, view_collator(View)), output_map_view(Req, View, Group, Db, QueryArgs, Keys); RedSrc -> - QueryArgs = parse_view_params(Req, Keys, reduce), {ok, View, Group} = couch_view:get_temp_reduce_view(Db, Language, DesignOptions, MapSrc, RedSrc), + QueryArgs = parse_view_params(Req, Keys, reduce, view_collator(View)), output_reduce_view(Req, Db, View, Group, QueryArgs, Keys) end; @@ -114,7 +115,7 @@ output_map_view(Req, View, Group, Db, QueryArgs, nil) -> limit = Limit, skip = SkipCount } = QueryArgs, - CurrentEtag = view_etag(Db, Group, View), + CurrentEtag = view_etag(Db, Group, View, QueryArgs), couch_httpd:etag_respond(Req, CurrentEtag, fun() -> {ok, RowCount} = couch_view:get_row_count(View), FoldlFun = make_view_fold_fun(Req, QueryArgs, CurrentEtag, Db, Group#group.current_seq, RowCount, #view_fold_helper_funs{reduce_count=fun couch_view:reduce_to_count/1}), @@ -130,7 +131,7 @@ output_map_view(Req, View, Group, Db, QueryArgs, Keys) -> limit = Limit, skip = SkipCount } = QueryArgs, - CurrentEtag = view_etag(Db, Group, View, Keys), + CurrentEtag = view_etag(Db, Group, View, QueryArgs, Keys), couch_httpd:etag_respond(Req, CurrentEtag, fun() -> {ok, RowCount} = couch_view:get_row_count(View), FoldAccInit = {Limit, SkipCount, undefined, []}, @@ -155,7 +156,7 @@ output_reduce_view(Req, Db, View, Group, QueryArgs, nil) -> skip = Skip, group_level = GroupLevel } = QueryArgs, - CurrentEtag = view_etag(Db, Group, View), + CurrentEtag = view_etag(Db, Group, View, QueryArgs), couch_httpd:etag_respond(Req, CurrentEtag, fun() -> {ok, GroupRowsFun, RespFun} = make_reduce_fold_funs(Req, GroupLevel, QueryArgs, CurrentEtag, Group#group.current_seq, @@ -173,7 +174,7 @@ output_reduce_view(Req, Db, View, Group, QueryArgs, Keys) -> skip = Skip, group_level = GroupLevel } = QueryArgs, - CurrentEtag = view_etag(Db, Group, View, Keys), + CurrentEtag = view_etag(Db, Group, View, QueryArgs, Keys), couch_httpd:etag_respond(Req, CurrentEtag, fun() -> {ok, GroupRowsFun, RespFun} = make_reduce_fold_funs(Req, GroupLevel, QueryArgs, CurrentEtag, Group#group.current_seq, @@ -209,18 +210,19 @@ load_view(Req, Db, {ViewDesignId, ViewName}, Keys) -> Reduce = get_reduce_type(Req), case couch_view:get_map_view(Db, ViewDesignId, ViewName, Stale) of {ok, View, Group} -> - QueryArgs = parse_view_params(Req, Keys, map), + QueryArgs = parse_view_params(Req, Keys, map, view_collator(View)), {map, View, Group, QueryArgs}; {not_found, _Reason} -> case couch_view:get_reduce_view(Db, ViewDesignId, ViewName, Stale) of {ok, ReduceView, Group} -> + Collator = view_collator(ReduceView), case Reduce of false -> - QueryArgs = parse_view_params(Req, Keys, map_red), + QueryArgs = parse_view_params(Req, Keys, map_red, Collator), MapView = couch_view:extract_map_view(ReduceView), {map, MapView, Group, QueryArgs}; _ -> - QueryArgs = parse_view_params(Req, Keys, reduce), + QueryArgs = parse_view_params(Req, Keys, reduce, Collator), {reduce, ReduceView, Group, QueryArgs} end; {not_found, Reason} -> @@ -228,12 +230,30 @@ load_view(Req, Db, {ViewDesignId, ViewName}, Keys) -> end end. +view_collator({reduce, _N, _Lang, View}) -> + view_collator(View); + +view_collator({temp_reduce, View}) -> + view_collator(View); + +view_collator(#view{btree=Btree}) -> + % Return an "is-less-than" predicate by calling into the btree's + % collator. For raw collation, couch_btree compares arbitrary + % Erlang terms, but for normal (ICU) collation, it expects + % {Json, Id} tuples. + fun + ({_JsonA, _IdA}=A, {_JsonB, _IdB}=B) -> + couch_btree:less(Btree, A, B); + (JsonA, JsonB) -> + couch_btree:less(Btree, {JsonA, null}, {JsonB, null}) + end. + % query_parse_error could be removed % we wouldn't need to pass the view type, it'd just parse params. % I'm not sure what to do about the error handling, but % it might simplify things to have a parse_view_params function % that doesn't throw(). -parse_view_params(Req, Keys, ViewType) -> +parse_view_params(Req, Keys, ViewType, LessThan) -> QueryList = couch_httpd:qs(Req), QueryParams = lists:foldl(fun({K, V}, Acc) -> @@ -247,7 +267,7 @@ parse_view_params(Req, Keys, ViewType) -> QueryArgs = lists:foldl(fun({K, V}, Args2) -> validate_view_query(K, V, Args2) end, Args, lists:reverse(QueryParams)), % Reverse to match QS order. - warn_on_empty_key_range(QueryArgs), + warn_on_empty_key_range(QueryArgs, LessThan), GroupLevel = QueryArgs#view_query_args.group_level, case {ViewType, GroupLevel, IsMultiGet} of {reduce, exact, true} -> @@ -328,15 +348,15 @@ parse_view_param("callback", _) -> parse_view_param(Key, Value) -> [{extra, {Key, Value}}]. -warn_on_empty_key_range(#view_query_args{start_key=undefined}) -> +warn_on_empty_key_range(#view_query_args{start_key=undefined}, _Lt) -> ok; -warn_on_empty_key_range(#view_query_args{end_key=undefined}) -> +warn_on_empty_key_range(#view_query_args{end_key=undefined}, _Lt) -> ok; -warn_on_empty_key_range(#view_query_args{start_key=A, end_key=A}) -> +warn_on_empty_key_range(#view_query_args{start_key=A, end_key=A}, _Lt) -> ok; warn_on_empty_key_range(#view_query_args{ - start_key=StartKey, end_key=EndKey, direction=Dir}) -> - case {Dir, couch_view:less_json(StartKey, EndKey)} of + start_key=StartKey, end_key=EndKey, direction=Dir}, LessThan) -> + case {Dir, LessThan(StartKey, EndKey)} of {fwd, false} -> throw({query_parse_error, <<"No rows can match your key range, reverse your ", @@ -640,14 +660,16 @@ send_json_reduce_row(Resp, {Key, Value}, RowFront) -> send_chunk(Resp, RowFront ++ ?JSON_ENCODE({[{key, Key}, {value, Value}]})), {ok, ",\r\n"}. -view_etag(Db, Group, View) -> - view_etag(Db, Group, View, nil). +view_etag(Db, Group, View, QueryArgs) -> + view_etag(Db, Group, View, QueryArgs, nil). -view_etag(Db, Group, {reduce, _, _, View}, Extra) -> - view_etag(Db, Group, View, Extra); -view_etag(Db, Group, {temp_reduce, View}, Extra) -> - view_etag(Db, Group, View, Extra); -view_etag(_Db, #group{sig=Sig}, #view{update_seq=UpdateSeq, purge_seq=PurgeSeq}, Extra) -> +view_etag(Db, Group, {reduce, _, _, View}, QueryArgs, Extra) -> + view_etag(Db, Group, View, QueryArgs, Extra); +view_etag(Db, Group, {temp_reduce, View}, QueryArgs, Extra) -> + view_etag(Db, Group, View, QueryArgs, Extra); +view_etag(_Db, #group{sig=Sig, current_seq=CurrentSeq}, _View, #view_query_args{include_docs=true}, Extra) -> + couch_httpd:make_etag({Sig, CurrentSeq, Extra}); +view_etag(_Db, #group{sig=Sig}, #view{update_seq=UpdateSeq, purge_seq=PurgeSeq}, _QueryArgs, Extra) -> couch_httpd:make_etag({Sig, UpdateSeq, PurgeSeq, Extra}). % the view row has an error diff --git a/apps/couch/src/couch_key_tree.erl b/apps/couch/src/couch_key_tree.erl index 2f3c6abf..5e24e0f7 100644 --- a/apps/couch/src/couch_key_tree.erl +++ b/apps/couch/src/couch_key_tree.erl @@ -50,7 +50,7 @@ -export([merge/3, find_missing/2, get_key_leafs/2, get_full_key_paths/2, get/2, compute_data_size/1]). -export([map/2, get_all_leafs/1, count_leafs/1, remove_leafs/2, - get_all_leafs_full/1,stem/2,map_leafs/2]). + get_all_leafs_full/1,stem/2,map_leafs/2, fold/3]). -include("couch_db.hrl"). @@ -373,6 +373,21 @@ tree_fold_simple(Fun, Pos, [{Key, Value, SubTree} | RestTree], Acc) -> Acc2 end. +fold(_Fun, Acc, []) -> + Acc; +fold(Fun, Acc0, [{Pos, Tree}|Rest]) -> + Acc1 = fold_simple(Fun, Acc0, Pos, [Tree]), + fold(Fun, Acc1, Rest). + +fold_simple(_Fun, Acc, _Pos, []) -> + Acc; +fold_simple(Fun, Acc0, Pos, [{Key, Value, SubTree} | RestTree]) -> + Type = if SubTree == [] -> leaf; true -> branch end, + Acc1 = Fun({Pos, Key}, Value, Type, Acc0), + Acc2 = fold_simple(Fun, Acc1, Pos+1, SubTree), + fold_simple(Fun, Acc2, Pos, RestTree). + + map(_Fun, []) -> []; map(Fun, [{Pos, Tree}|Rest]) -> diff --git a/apps/couch/src/couch_log.erl b/apps/couch/src/couch_log.erl index 9bac7450..362d092d 100644 --- a/apps/couch/src/couch_log.erl +++ b/apps/couch/src/couch_log.erl @@ -25,22 +25,12 @@ -define(LEVEL_TMI, 0). debug(Format, Args) -> - case debug_on() of - false -> - ok; - true -> - {ConsoleMsg, FileMsg} = get_log_messages(self(), debug, Format, Args), - gen_event:sync_notify(error_logger, {couch_debug, ConsoleMsg, FileMsg}) - end. + {ConsoleMsg, FileMsg} = get_log_messages(self(), debug, Format, Args), + gen_event:sync_notify(error_logger, {couch_debug, ConsoleMsg, FileMsg}). info(Format, Args) -> - case info_on() of - false -> - ok; - true -> - {ConsoleMsg, FileMsg} = get_log_messages(self(), info, Format, Args), - gen_event:sync_notify(error_logger, {couch_info, ConsoleMsg, FileMsg}) - end. + {ConsoleMsg, FileMsg} = get_log_messages(self(), info, Format, Args), + gen_event:sync_notify(error_logger, {couch_info, ConsoleMsg, FileMsg}). error(Format, Args) -> {ConsoleMsg, FileMsg} = get_log_messages(self(), error, Format, Args), @@ -180,6 +170,15 @@ get_log_messages(Pid, Level, Format, Args) -> read(Bytes, Offset) -> LogFileName = couch_config:get("log", "file"), LogFileSize = filelib:file_size(LogFileName), + MaxChunkSize = list_to_integer( + couch_config:get("httpd", "log_max_chunk_size", "1000000")), + case Bytes > MaxChunkSize of + true -> + throw({bad_request, "'bytes' cannot exceed " ++ + integer_to_list(MaxChunkSize)}); + false -> + ok + end, {ok, Fd} = file:open(LogFileName, [read]), Start = lists:max([LogFileSize - Bytes, 0]) + Offset, @@ -188,4 +187,5 @@ read(Bytes, Offset) -> % TODO: make streaming {ok, Chunk} = file:pread(Fd, Start, LogFileSize), + ok = file:close(Fd), Chunk. diff --git a/apps/couch/src/couch_os_process.erl b/apps/couch/src/couch_os_process.erl index 7fe8aa89..2a6d92a7 100644 --- a/apps/couch/src/couch_os_process.erl +++ b/apps/couch/src/couch_os_process.erl @@ -173,10 +173,7 @@ handle_info({Port, {exit_status, 0}}, #os_proc{port=Port}=OsProc) -> {stop, normal, OsProc}; handle_info({Port, {exit_status, Status}}, #os_proc{port=Port}=OsProc) -> ?LOG_ERROR("OS Process died with status: ~p", [Status]), - {stop, {exit_status, Status}, OsProc}; -handle_info(Msg, OsProc) -> - ?LOG_DEBUG("OS Proc: Unknown info: ~p", [Msg]), - {noreply, OsProc}. + {stop, {exit_status, Status}, OsProc}. code_change(_OldVsn, State, _Extra) -> {ok, State}. diff --git a/apps/couch/src/couch_query_servers.erl b/apps/couch/src/couch_query_servers.erl index be7c465b..9714a0ca 100644 --- a/apps/couch/src/couch_query_servers.erl +++ b/apps/couch/src/couch_query_servers.erl @@ -17,9 +17,9 @@ -export([filter_docs/5]). -export([with_ddoc_proc/2, proc_prompt/2, ddoc_prompt/3, ddoc_proc_prompt/3, json_doc/1]). --export([get_os_process/1, ret_os_process/1]). -% -export([test/0]). +% For 210-os-proc-pool.t +-export([get_os_process/1, ret_os_process/1]). -include("couch_db.hrl"). diff --git a/apps/couch/src/couch_rep.erl b/apps/couch/src/couch_rep.erl index 2d011aab..46bcb282 100644 --- a/apps/couch/src/couch_rep.erl +++ b/apps/couch/src/couch_rep.erl @@ -20,6 +20,7 @@ -export([start_replication/4, end_replication/1, get_result/4]). -include("couch_db.hrl"). +-include("couch_js_functions.hrl"). -include_lib("ibrowse/include/ibrowse.hrl"). -define(REP_ID_VERSION, 2). @@ -110,20 +111,20 @@ checkpoint(Server) -> gen_server:cast(Server, do_checkpoint). get_result(Server, {BaseId, _Extension}, {Props} = PostBody, UserCtx) -> - case couch_util:get_value(<<"continuous">>, Props, false) of - true -> - {ok, {continuous, ?l2b(BaseId)}}; - false -> - try gen_server:call(Server, get_result, infinity) of - retry -> replicate(PostBody, UserCtx); - Else -> Else - catch - exit:{noproc, {gen_server, call, [Server, get_result , infinity]}} -> - %% oops, this replication just finished -- restart it. - replicate(PostBody, UserCtx); - exit:{normal, {gen_server, call, [Server, get_result , infinity]}} -> - %% we made the call during terminate - replicate(PostBody, UserCtx) + case couch_util:get_value(<<"continuous">>, Props, false) of + true -> + {ok, {continuous, ?l2b(BaseId)}}; + false -> + try gen_server:call(Server, get_result, infinity) of + retry -> replicate(PostBody, UserCtx); + Else -> Else + catch + exit:{noproc, {gen_server, call, [Server, get_result, infinity]}} -> + %% oops, this replication just finished -- restart it. + replicate(PostBody, UserCtx); + exit:{normal, {gen_server, call, [Server, get_result, infinity]}} -> + %% we made the call during terminate + replicate(PostBody, UserCtx) end end. @@ -154,13 +155,13 @@ do_init([{BaseId, _Ext} = RepId, {PostProps}, UserCtx, Module] = InitArgs) -> [SourceLog, TargetLog] = find_replication_logs( [Source, Target], BaseId, {PostProps}, UserCtx), - {StartSeq, History} = compare_replication_logs(SourceLog, TargetLog), + {StartSeq, History} = compare_replication_logs(SourceLog, TargetLog), - {ok, ChangesFeed} = + {ok, ChangesFeed} = couch_rep_changes_feed:start_link(self(), Source, StartSeq, PostProps), - {ok, MissingRevs} = + {ok, MissingRevs} = couch_rep_missing_revs:start_link(self(), Target, ChangesFeed, PostProps), - {ok, Reader} = + {ok, Reader} = couch_rep_reader:start_link(self(), Source, MissingRevs, PostProps), {ok, Writer} = couch_rep_writer:start_link(self(), Target, Reader, PostProps), @@ -545,7 +546,7 @@ filter_code(Filter, Props, UserCtx) -> DocErrorMsg = io_lib:format( "Couldn't open document `_design/~s` from source " "database `~s`: ~s", - [dbname(Source), DDocName, couch_util:to_binary(DocError)]), + [DDocName, dbname(Source), couch_util:to_binary(DocError)]), throw({error, iolist_to_binary(DocErrorMsg)}) end, Code = couch_util:get_nested_json_value( @@ -649,18 +650,18 @@ open_db(<<"https://",_/binary>>=Url, _, ProxyParams, CreateTarget) -> open_db({[{<<"url">>,Url}]}, [], ProxyParams, CreateTarget); open_db(<<DbName/binary>>, UserCtx, _ProxyParams, CreateTarget) -> try - case CreateTarget of - true -> - ok = couch_httpd:verify_is_server_admin(UserCtx), - couch_server:create(DbName, [{user_ctx, UserCtx}]); + case CreateTarget of + true -> + ok = couch_httpd:verify_is_server_admin(UserCtx), + couch_server:create(DbName, [{user_ctx, UserCtx}]); false -> ok - end, + end, - case couch_db:open(DbName, [{user_ctx, UserCtx}]) of - {ok, Db} -> - couch_db:monitor(Db), - Db; + case couch_db:open(DbName, [{user_ctx, UserCtx}]) of + {ok, Db} -> + couch_db:monitor(Db), + Db; {not_found, no_db_file} -> throw({db_not_found, DbName}) end diff --git a/apps/couch/src/couch_rep_changes_feed.erl b/apps/couch/src/couch_rep_changes_feed.erl index 36fe82aa..49edf7cc 100644 --- a/apps/couch/src/couch_rep_changes_feed.erl +++ b/apps/couch/src/couch_rep_changes_feed.erl @@ -154,7 +154,7 @@ init([Parent, #http_db{headers = Headers0} = Source, Since, PostProps]) -> end; {ibrowse_async_headers, ReqId, Code, _} -> {stop, {changes_error_code, list_to_integer(Code)}} - after 10000 -> + after 30000 -> {stop, changes_timeout} end; diff --git a/apps/couch/src/couch_rep_reader.erl b/apps/couch/src/couch_rep_reader.erl index 0d344e5c..1e8ca074 100644 --- a/apps/couch/src/couch_rep_reader.erl +++ b/apps/couch/src/couch_rep_reader.erl @@ -244,7 +244,7 @@ reader_loop(ReaderServer, Parent, Source, MissingRevsServer) -> case couch_rep_missing_revs:next(MissingRevsServer) of complete -> exit(complete); - {HighSeq, IdsRevs} -> + {_HighSeq, IdsRevs} -> % to be safe, make sure Results are sorted by source_seq SortedIdsRevs = lists:keysort(2, IdsRevs), RequestSeqs = [S || {_,S,_} <- SortedIdsRevs], @@ -255,8 +255,8 @@ reader_loop(ReaderServer, Parent, Source, MissingRevsServer) -> infinity) || {Id,Seq,Revs} <- SortedIdsRevs], reader_loop(ReaderServer, Parent, Source, MissingRevsServer); _Local -> - {ok, Source1} = gen_server:call(Parent, get_source_db, infinity), - Source2 = maybe_reopen_db(Source1, HighSeq), + {ok, Source2} = couch_db:open( + Source#db.name, [{user_ctx, Source#db.user_ctx}]), lists:foreach(fun({Id,Seq,Revs}) -> {ok, Docs} = couch_db:open_doc_revs(Source2, Id, Revs, [latest]), JustTheDocs = [Doc || {ok, Doc} <- Docs], @@ -268,12 +268,6 @@ reader_loop(ReaderServer, Parent, Source, MissingRevsServer) -> end end. -maybe_reopen_db(#db{update_seq=OldSeq} = Db, HighSeq) when HighSeq > OldSeq -> - {ok, NewDb} = couch_db:open(Db#db.name, [{user_ctx, Db#db.user_ctx}]), - NewDb; -maybe_reopen_db(Db, _HighSeq) -> - Db. - spawn_document_request(Source, Id, Seq, Revs) -> Server = self(), SpawnFun = fun() -> diff --git a/apps/couch/src/couch_rep_writer.erl b/apps/couch/src/couch_rep_writer.erl index 2b722e8e..40323925 100644 --- a/apps/couch/src/couch_rep_writer.erl +++ b/apps/couch/src/couch_rep_writer.erl @@ -26,7 +26,8 @@ writer_loop(Parent, Reader) -> ok; {HighSeq, Docs} -> DocCount = length(Docs), - {ok, Target} = gen_server:call(Parent, get_target_db, infinity), + {ok, Target0} = gen_server:call(Parent, get_target_db, infinity), + Target = open_db(Target0), try write_docs(Target, Docs) of {ok, []} -> Parent ! {update_stats, docs_written, DocCount}; @@ -38,6 +39,8 @@ writer_loop(Parent, Reader) -> {attachment_request_failed, Err} -> ?LOG_DEBUG("writer failed to write an attachment ~p", [Err]), exit({attachment_request_failed, Err, Docs}) + after + close_db(Target) end, Parent ! {writer_checkpoint, HighSeq}, couch_rep_att:cleanup(), @@ -163,3 +166,14 @@ write_docs_1({Props}) -> ErrId = couch_util:to_existing_atom(couch_util:get_value(<<"error">>, Props)), Reason = couch_util:get_value(<<"reason">>, Props), {{Id, Rev}, {ErrId, Reason}}. + +open_db(#db{name = Name, user_ctx = UserCtx}) -> + {ok, Db} = couch_db:open(Name, [{user_ctx, UserCtx}]), + Db; +open_db(HttpDb) -> + HttpDb. + +close_db(#db{} = Db) -> + couch_db:close(Db); +close_db(_HttpDb) -> + ok. diff --git a/apps/couch/src/couch_replication_manager.erl b/apps/couch/src/couch_replication_manager.erl index 3f7cc27c..3715cea1 100644 --- a/apps/couch/src/couch_replication_manager.erl +++ b/apps/couch/src/couch_replication_manager.erl @@ -317,7 +317,7 @@ process_update(State, {Change}) -> <<"error">> -> case ets:lookup(?DOC_TO_REP, DocId) of [] -> - maybe_start_replication(State, DocId, JsonRepDoc); + maybe_start_replication(State, DocId, JsonRepDoc); _ -> State end diff --git a/apps/couch/src/couch_view.erl b/apps/couch/src/couch_view.erl index 05174245..8d479d7e 100644 --- a/apps/couch/src/couch_view.erl +++ b/apps/couch/src/couch_view.erl @@ -260,8 +260,16 @@ fold_fun(Fun, [KV|Rest], {KVReds, Reds}, Acc) -> fold(#view{btree=Btree}, Fun, Acc, Options) -> WrapperFun = - fun(KV, Reds, Acc2) -> - fold_fun(Fun, expand_dups([KV],[]), Reds, Acc2) + fun(visit, KV, Reds, Acc2) -> + fold_fun(Fun, expand_dups([KV],[]), Reds, Acc2); + (traverse, LK, Red, Acc2) + when is_function(Fun, 4) -> + Fun(traverse, LK, Red, Acc2); + (traverse, _LK, Red, {_, Skip, _, _} = Acc2) + when Skip >= element(1, Red) -> + {skip, setelement(2, Acc2, Skip - element(1, Red))}; + (traverse, _, _, Acc2) -> + {ok, Acc2} end, {ok, _LastReduce, _AccResult} = couch_btree:fold(Btree, WrapperFun, Acc, Options). diff --git a/apps/couch/src/couch_view_compactor.erl b/apps/couch/src/couch_view_compactor.erl index 69aaff00..8ea1dca2 100644 --- a/apps/couch/src/couch_view_compactor.erl +++ b/apps/couch/src/couch_view_compactor.erl @@ -20,14 +20,14 @@ %% @doc Compacts the views. GroupId must not include the _design/ prefix start_compact(DbName, GroupId) -> Pid = couch_view:get_group_server(DbName, <<"_design/",GroupId/binary>>), - gen_server:cast(Pid, {start_compact, fun compact_group/2}). + gen_server:call(Pid, {start_compact, fun compact_group/3}). %%============================================================================= %% internal functions %%============================================================================= %% @spec compact_group(Group, NewGroup) -> ok -compact_group(Group, EmptyGroup) -> +compact_group(Group, EmptyGroup, DbName) -> #group{ current_seq = Seq, id_btree = IdBtree, @@ -36,10 +36,8 @@ compact_group(Group, EmptyGroup) -> } = Group, #group{ - dbname = DbName, fd = Fd, id_btree = EmptyIdBtree, - sig = Sig, views = EmptyViews } = EmptyGroup, @@ -82,9 +80,26 @@ compact_group(Group, EmptyGroup) -> views=NewViews, current_seq=Seq }, - - Pid = ets:lookup_element(group_servers_by_sig, {DbName, Sig}, 2), - gen_server:cast(Pid, {compact_done, NewGroup}). + maybe_retry_compact(Db, GroupId, NewGroup). + +maybe_retry_compact(#db{name = DbName} = Db, GroupId, NewGroup) -> + #group{sig = Sig, fd = NewFd} = NewGroup, + Header = {Sig, couch_view_group:get_index_header_data(NewGroup)}, + ok = couch_file:write_header(NewFd, Header), + Pid = ets:lookup_element(group_servers_by_sig, {DbName, Sig}, 2), + case gen_server:call(Pid, {compact_done, NewGroup}) of + ok -> + couch_db:close(Db); + update -> + {ok, Db2} = couch_db:reopen(Db), + {_, Ref} = erlang:spawn_monitor(fun() -> + couch_view_updater:update(nil, NewGroup, Db2) + end), + receive + {'DOWN', Ref, _, _, {new_group, NewGroup2}} -> + maybe_retry_compact(Db2, GroupId, NewGroup2) + end + end. %% @spec compact_view(View, EmptyView) -> CompactView compact_view(View, EmptyView) -> diff --git a/apps/couch/src/couch_view_group.erl b/apps/couch/src/couch_view_group.erl index 11cb4c60..87e4cd2e 100644 --- a/apps/couch/src/couch_view_group.erl +++ b/apps/couch/src/couch_view_group.erl @@ -17,6 +17,9 @@ -export([start_link/1, request_group/2, trigger_group_update/2, request_group_info/1]). -export([open_db_group/2, open_temp_group/5, design_doc_to_view_group/1,design_root/2]). +%% Exports for the compactor +-export([get_index_header_data/1]). + %% gen_server callbacks -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). @@ -80,8 +83,7 @@ start_link(InitArgs) -> init({{_, DbName, _} = InitArgs, ReturnPid, Ref}) -> process_flag(trap_exit, true), case prepare_group(InitArgs, false) of - {ok, #group{fd=Fd, current_seq=Seq}=Group} -> - {ok, Db} = couch_db:open(DbName, []), + {ok, Db, #group{fd=Fd, current_seq=Seq}=Group} -> case Seq > couch_db:get_update_seq(Db) of true -> ReturnPid ! {Ref, self(), {error, invalid_view_seq}}, @@ -92,7 +94,7 @@ init({{_, DbName, _} = InitArgs, ReturnPid, Ref}) -> {ok, #group_state{ db_name=DbName, init_args=InitArgs, - group=Group#group{dbname=DbName}, + group=Group, ref_counter=erlang:monitor(process,Fd)}} end; Error -> @@ -118,16 +120,16 @@ init({{_, DbName, _} = InitArgs, ReturnPid, Ref}) -> handle_call({request_group, RequestSeq}, From, #group_state{ + db_name=DbName, group=#group{current_seq=Seq}=Group, updater_pid=nil, waiting_list=WaitList }=State) when RequestSeq > Seq -> Owner = self(), - Pid = spawn_link(fun()-> couch_view_updater:update(Owner, Group) end), + Pid = spawn_link(fun()-> couch_view_updater:update(Owner, Group, DbName) end), {noreply, State#group_state{ updater_pid=Pid, - group=Group, waiting_list=[{From,RequestSeq}|WaitList] }, infinity}; @@ -148,40 +150,28 @@ handle_call({request_group, RequestSeq}, From, waiting_list=[{From, RequestSeq}|WaitList] }, infinity}; -handle_call({start_compact, CompactFun}, _From, State) -> - {noreply, NewState} = handle_cast({start_compact, CompactFun}, State), - {reply, {ok, NewState#group_state.compactor_pid}, NewState}; - handle_call(request_group_info, _From, State) -> GroupInfo = get_group_info(State), - {reply, {ok, GroupInfo}, State}. + {reply, {ok, GroupInfo}, State}; -handle_cast({update_group, RequestSeq}, - #group_state{ - group=#group{current_seq=Seq}=Group, - updater_pid=nil}=State) when RequestSeq > Seq -> - Owner = self(), - Pid = spawn_link(fun()-> couch_view_updater:update(Owner, Group) end), - {noreply, State#group_state{updater_pid=Pid}}; -handle_cast({update_group, _RequestSeq}, State) -> - {noreply, State}; - -handle_cast({start_compact, CompactFun}, #group_state{compactor_pid=nil} +handle_call({start_compact, CompactFun}, _From, #group_state{compactor_pid=nil} = State) -> #group_state{ - group = #group{dbname = DbName, name = GroupId, sig = GroupSig} = Group, - init_args = {RootDir, _, _} + group = #group{name = GroupId, sig = GroupSig} = Group, + init_args = {RootDir, DbName, _} } = State, ?LOG_INFO("View index compaction starting for ~s ~s", [DbName, GroupId]), + {ok, Db} = couch_db:open_int(DbName, []), {ok, Fd} = open_index_file(compact, RootDir, DbName, GroupSig), - NewGroup = reset_file(Fd, DbName, Group), - Pid = spawn_link(fun() -> CompactFun(Group, NewGroup) end), - {noreply, State#group_state{compactor_pid = Pid}}; -handle_cast({start_compact, _}, State) -> + NewGroup = reset_file(Db, Fd, DbName, Group), + couch_db:close(Db), + Pid = spawn_link(fun() -> CompactFun(Group, NewGroup, DbName) end), + {reply, {ok, Pid}, State#group_state{compactor_pid = Pid}}; +handle_call({start_compact, _}, _From, #group_state{compactor_pid=Pid} = State) -> %% compact already running, this is a no-op - {noreply, State}; + {reply, {ok, Pid}, State}; -handle_cast({compact_done, #group{fd=NewFd, current_seq=NewSeq} = NewGroup}, +handle_call({compact_done, #group{fd=NewFd, current_seq=NewSeq} = NewGroup}, _From, #group_state{group = #group{current_seq=OldSeq}} = State) when NewSeq >= OldSeq -> #group_state{ @@ -204,7 +194,7 @@ handle_cast({compact_done, #group{fd=NewFd, current_seq=NewSeq} = NewGroup}, unlink(UpdaterPid), exit(UpdaterPid, view_compaction_complete), Owner = self(), - spawn_link(fun()-> couch_view_updater:update(Owner, NewGroup) end); + spawn_link(fun()-> couch_view_updater:update(Owner, NewGroup, DbName) end); true -> nil end, @@ -216,30 +206,20 @@ handle_cast({compact_done, #group{fd=NewFd, current_seq=NewSeq} = NewGroup}, erlang:demonitor(RefCounter), self() ! delayed_commit, - {noreply, State#group_state{ + {reply, ok, State#group_state{ group=NewGroup, ref_counter=erlang:monitor(process,NewFd), compactor_pid=nil, updater_pid=NewUpdaterPid }}; -handle_cast({compact_done, NewGroup}, State) -> +handle_call({compact_done, NewGroup}, _From, State) -> #group_state{ group = #group{name = GroupId, current_seq = CurrentSeq}, init_args={_RootDir, DbName, _} } = State, ?LOG_INFO("View index compaction still behind for ~s ~s -- current: ~p " ++ "compact: ~p", [DbName, GroupId, CurrentSeq, NewGroup#group.current_seq]), - GroupServer = self(), - Pid = spawn_link(fun() -> - erlang:monitor(process, NewGroup#group.fd), - {_,Ref} = erlang:spawn_monitor(fun() -> - couch_view_updater:update(nil, NewGroup) - end), - receive {'DOWN', Ref, _, _, {new_group, NewGroup2}} -> - gen_server:cast(GroupServer, {compact_done, NewGroup2}) - end - end), - {noreply, State#group_state{compactor_pid = Pid}}; + {reply, update, State}. handle_cast({partial_update, Pid, NewGroup}, #group_state{updater_pid=Pid} = State) -> @@ -279,7 +259,7 @@ handle_info(delayed_commit, #group_state{db_name=DbName,group=Group}=State) -> end; handle_info({'EXIT', FromPid, {new_group, Group}}, - #group_state{ + #group_state{db_name=DbName, updater_pid=UpPid, ref_counter=RefCounter, waiting_list=WaitList, @@ -293,22 +273,25 @@ handle_info({'EXIT', FromPid, {new_group, Group}}, {noreply, State#group_state{waiting_commit=true, waiting_list=[], group=Group, updater_pid=nil}}; StillWaiting -> - % we still have some waiters, reupdate the index + % we still have some waiters, reopen the database and reupdate the index Owner = self(), - Pid = spawn_link(fun() -> couch_view_updater:update(Owner, Group) end), + Pid = spawn_link(fun() -> couch_view_updater:update(Owner, Group, DbName) end), {noreply, State#group_state{waiting_commit=true, - waiting_list=StillWaiting, group=Group, updater_pid=Pid}} + waiting_list=StillWaiting, updater_pid=Pid}} end; handle_info({'EXIT', _, {new_group, _}}, State) -> %% message from an old (probably pre-compaction) updater; ignore {noreply, State}; -handle_info({'EXIT', FromPid, reset}, #group_state{init_args=InitArgs, - updater_pid=FromPid}=State) -> +handle_info({'EXIT', UpPid, reset}, + #group_state{init_args=InitArgs, updater_pid=UpPid} = State) -> case prepare_group(InitArgs, true) of - {ok, ResetGroup} -> + {ok, Db, ResetGroup} -> Owner = self(), - Pid = spawn_link(fun()-> couch_view_updater:update(Owner, ResetGroup) end), + couch_db:close(Db), + Pid = spawn_link(fun() -> + couch_view_updater:update(Owner, ResetGroup, Db#db.name) + end), {noreply, State#group_state{ updater_pid=Pid, group=ResetGroup}}; @@ -368,29 +351,32 @@ reply_all(#group_state{waiting_list=WaitList}=State, Reply) -> [catch gen_server:reply(Pid, Reply) || {Pid, _} <- WaitList], State#group_state{waiting_list=[]}. -prepare_group({Root, DbName, #group{dbname=X}=G}, Reset) when X =/= DbName -> - prepare_group({Root, DbName, G#group{dbname=DbName}}, Reset); prepare_group({RootDir, DbName, #group{sig=Sig}=Group}, ForceReset)-> - case open_index_file(RootDir, DbName, Sig) of - {ok, Fd} -> - if ForceReset -> - % this can happen if we missed a purge - {ok, reset_file(Fd, DbName, Group)}; - true -> - % 09 UPGRADE CODE - ok = couch_file:upgrade_old_header(Fd, <<$r, $c, $k, 0>>), - case (catch couch_file:read_header(Fd)) of - {ok, {Sig, HeaderInfo}} -> - % sigs match! - {ok, init_group(Fd, Group, HeaderInfo)}; - _ -> - % this happens on a new file - {ok, reset_file(Fd, DbName, Group)} - end + case couch_db:open_int(DbName, []) of + {ok, Db} -> + case open_index_file(RootDir, DbName, Sig) of + {ok, Fd} -> + if ForceReset -> + % this can happen if we missed a purge + {ok, Db, reset_file(Db, Fd, DbName, Group)}; + true -> + % 09 UPGRADE CODE + ok = couch_file:upgrade_old_header(Fd, <<$r, $c, $k, 0>>), + case (catch couch_file:read_header(Fd)) of + {ok, {Sig, HeaderInfo}} -> + % sigs match! + {ok, Db, init_group(Db, Fd, Group, HeaderInfo)}; + _ -> + % this happens on a new file + {ok, Db, reset_file(Db, Fd, DbName, Group)} + end + end; + Error -> + catch delete_index_file(RootDir, DbName, Sig), + Error end; - Error -> - catch delete_index_file(RootDir, DbName, Sig), - Error + Else -> + Else end. get_index_header_data(#group{current_seq=Seq, purge_seq=PurgeSeq, @@ -497,7 +483,7 @@ open_db_group(DbName, GroupId) -> end) end), receive {'DOWN', Ref, process, Pid, {ok, Doc}} -> - {ok, design_doc_to_view_group(Doc)}; + {ok, design_doc_to_view_group(Doc)}; {'DOWN', Ref, process, Pid, Error} -> Error end. @@ -575,31 +561,26 @@ design_doc_to_view_group(#doc{id=Id,body={Fields}}) -> end, 0, lists:sort(dict:to_list(DictBySrc))), set_view_sig(#group{name=Id, lib=Lib, views=Views, def_lang=Language, design_options=DesignOptions}). -reset_group(DbName, #group{views=Views}=Group) -> +reset_group(#group{views=Views}=Group) -> Views2 = [View#view{btree=nil} || View <- Views], - Group#group{dbname=DbName,fd=nil,query_server=nil,current_seq=0, + Group#group{fd=nil,query_server=nil,current_seq=0, id_btree=nil,views=Views2}. -reset_file(Fd, DbName, #group{sig=Sig,name=Name} = Group) -> +reset_file(Db, Fd, DbName, #group{sig=Sig,name=Name} = Group) -> ?LOG_DEBUG("Resetting group index \"~s\" in db ~s", [Name, DbName]), ok = couch_file:truncate(Fd, 0), ok = couch_file:write_header(Fd, {Sig, nil}), - init_group(Fd, reset_group(DbName, Group), nil). + init_group(Db, Fd, reset_group(Group), nil). delete_index_file(RootDir, DbName, GroupSig) -> couch_file:delete(RootDir, index_file_name(RootDir, DbName, GroupSig)). -init_group(Fd, #group{dbname=DbName, views=Views}=Group, nil) -> - case couch_db:open(DbName, []) of - {ok, Db} -> - PurgeSeq = try couch_db:get_purge_seq(Db) after couch_db:close(Db) end, - Header = #index_header{purge_seq=PurgeSeq, view_states=[{nil, 0, 0} || _ <- Views]}, - init_group(Fd, Group, Header); - {not_found, no_db_file} -> - ?LOG_ERROR("~p no_db_file ~p", [?MODULE, DbName]), - exit(no_db_file) - end; -init_group(Fd, #group{def_lang=Lang,views=Views}=Group, IndexHeader) -> +init_group(Db, Fd, #group{views=Views}=Group, nil) -> + init_group(Db, Fd, Group, + #index_header{seq=0, purge_seq=couch_db:get_purge_seq(Db), + id_btree_state=nil, view_states=[{nil, 0, 0} || _ <- Views]}); +init_group(_Db, Fd, #group{def_lang=Lang,views=Views}= + Group, IndexHeader) -> #index_header{seq=Seq, purge_seq=PurgeSeq, id_btree_state=IdBtreeState, view_states=ViewStates} = IndexHeader, StateUpdate = fun diff --git a/apps/couch/src/couch_view_updater.erl b/apps/couch/src/couch_view_updater.erl index 90cb20d4..8238e3e5 100644 --- a/apps/couch/src/couch_view_updater.erl +++ b/apps/couch/src/couch_view_updater.erl @@ -12,29 +12,35 @@ -module(couch_view_updater). --export([update/2, do_maps/4, do_writes/5, load_docs/3]). +-export([update/3, do_maps/4, do_writes/5, load_docs/3]). -include("couch_db.hrl"). --spec update(_, #group{}) -> no_return(). +-spec update(_, #group{}, Dbname::binary()) -> no_return(). -update(Owner, Group) -> +update(Owner, Group, DbName) when is_binary(DbName) -> + {ok, Db} = couch_db:open_int(DbName, []), + try + update(Owner, Group, Db) + after + couch_db:close(Db) + end; + +update(Owner, Group, #db{name = DbName} = Db) -> #group{ - dbname = DbName, name = GroupName, current_seq = Seq, purge_seq = PurgeSeq } = Group, couch_task_status:add_task(<<"View Group Indexer">>, <<DbName/binary," ",GroupName/binary>>, <<"Starting index update">>), - {ok, Db} = couch_db:open(DbName, []), DbPurgeSeq = couch_db:get_purge_seq(Db), Group2 = if DbPurgeSeq == PurgeSeq -> Group; DbPurgeSeq == PurgeSeq + 1 -> couch_task_status:update(<<"Removing purged entries from view index.">>), - purge_index(Db, Group); + purge_index(Group, Db); true -> couch_task_status:update(<<"Resetting view index due to lost purge entries.">>), exit(reset) @@ -77,7 +83,7 @@ load_docs(DocInfo, _, {I, Db, MapQueue, DocOpts, IncludeDesign, Total} = Acc) -> load_doc(Db, DocInfo, MapQueue, DocOpts, IncludeDesign), {ok, setelement(1, Acc, I+1)}. -purge_index(Db, #group{views=Views, id_btree=IdBtree}=Group) -> +purge_index(#group{views=Views, id_btree=IdBtree}=Group, Db) -> {ok, PurgedIdsRevs} = couch_db:get_last_purged(Db), Ids = [Id || {Id, _Revs} <- PurgedIdsRevs], {ok, Lookups, IdBtree2} = couch_btree:query_modify(IdBtree, Ids, [], Ids), @@ -133,7 +139,7 @@ load_doc(Db, DI, MapQueue, DocOpts, IncludeDesign) -> couch_work_queue:queue(MapQueue, {Seq, Doc}) end end. - + -spec do_maps(#group{}, pid(), pid(), any()) -> any(). do_maps(Group, MapQueue, WriteQueue, ViewEmptyKVs) -> case couch_work_queue:dequeue(MapQueue) of @@ -162,10 +168,10 @@ do_writes(Parent, Owner, Group, WriteQueue, InitialBuild) -> if Go =:= stop -> Parent ! {new_group, Group2}; true -> - case Owner of - nil -> ok; - _ -> ok = gen_server:cast(Owner, {partial_update, Parent, Group2}) - end, + case Owner of + nil -> ok; + _ -> ok = gen_server:cast(Owner, {partial_update, Parent, Group2}) + end, ?MODULE:do_writes(Parent, Owner, Group2, WriteQueue, InitialBuild) end end. diff --git a/apps/couch/test/etap/020-btree-basics.t b/apps/couch/test/etap/020-btree-basics.t index 996d240a..c65d79c2 100755 --- a/apps/couch/test/etap/020-btree-basics.t +++ b/apps/couch/test/etap/020-btree-basics.t @@ -128,6 +128,7 @@ test_btree(Btree, KeyValues) -> ok = test_key_access(Btree, KeyValues), ok = test_lookup_access(Btree, KeyValues), ok = test_final_reductions(Btree, KeyValues), + ok = test_traversal_callbacks(Btree, KeyValues), true. test_add_remove(Btree, OutKeyValues, RemainingKeyValues) -> @@ -188,6 +189,18 @@ test_final_reductions(Btree, KeyValues) -> KVLen = FoldLRed + FoldRRed, ok. +test_traversal_callbacks(Btree, KeyValues) -> + FoldFun = + fun + (visit, GroupedKey, Unreduced, Acc) -> + {ok, Acc andalso false}; + (traverse, _LK, _Red, Acc) -> + {skip, Acc andalso true} + end, + % With 250 items the root is a kp. Always skipping should reduce to true. + {ok, _, true} = couch_btree:fold(Btree, FoldFun, true, [{dir, fwd}]), + ok. + shuffle(List) -> randomize(round(math:log(length(List)) + 0.5), List). diff --git a/apps/couch/test/etap/201-view-group-shutdown.t b/apps/couch/test/etap/201-view-group-shutdown.t new file mode 100755 index 00000000..03feac2b --- /dev/null +++ b/apps/couch/test/etap/201-view-group-shutdown.t @@ -0,0 +1,300 @@ +#!/usr/bin/env escript +%% -*- erlang -*- + +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-record(user_ctx, { + name = null, + roles = [], + handler +}). + +-record(db, { + main_pid = nil, + update_pid = nil, + compactor_pid = nil, + instance_start_time, % number of microsecs since jan 1 1970 as a binary string + fd, + fd_ref_counter, + header = nil, + committed_update_seq, + fulldocinfo_by_id_btree, + docinfo_by_seq_btree, + local_docs_btree, + update_seq, + name, + filepath, + validate_doc_funs = [], + security = [], + security_ptr = nil, + user_ctx = #user_ctx{}, + waiting_delayed_commit = nil, + revs_limit = 1000, + fsync_options = [], + is_sys_db = false +}). + +main_db_name() -> <<"couch_test_view_group_shutdown">>. + + +main(_) -> + test_util:init_code_path(), + + etap:plan(17), + case (catch test()) of + ok -> + etap:end_tests(); + Other -> + etap:diag(io_lib:format("Test died abnormally: ~p", [Other])), + etap:bail(Other) + end, + ok. + + +test() -> + couch_server_sup:start_link(test_util:config_files()), + ok = couch_config:set("couchdb", "max_dbs_open", "3", false), + ok = couch_config:set("couchdb", "delayed_commits", "false", false), + crypto:start(), + + % Test that while a view group is being compacted its database can not + % be closed by the database LRU system. + test_view_group_compaction(), + + couch_server_sup:stop(), + ok. + + +test_view_group_compaction() -> + {ok, DbWriter3} = create_db(<<"couch_test_view_group_shutdown_w3">>), + ok = couch_db:close(DbWriter3), + + {ok, MainDb} = create_main_db(), + ok = couch_db:close(MainDb), + + {ok, DbWriter1} = create_db(<<"couch_test_view_group_shutdown_w1">>), + ok = couch_db:close(DbWriter1), + + {ok, DbWriter2} = create_db(<<"couch_test_view_group_shutdown_w2">>), + ok = couch_db:close(DbWriter2), + + Writer1 = spawn_writer(DbWriter1#db.name), + Writer2 = spawn_writer(DbWriter2#db.name), + etap:is(is_process_alive(Writer1), true, "Spawned writer 1"), + etap:is(is_process_alive(Writer2), true, "Spawned writer 2"), + + etap:is(get_writer_status(Writer1), ok, "Writer 1 opened his database"), + etap:is(get_writer_status(Writer2), ok, "Writer 2 opened his database"), + + {ok, CompactPid} = couch_view_compactor:start_compact( + MainDb#db.name, <<"foo">>), + MonRef = erlang:monitor(process, CompactPid), + + % Add some more docs to database and trigger view update + {ok, MainDb2} = couch_db:open_int(MainDb#db.name, []), + ok = populate_main_db(MainDb2, 3, 3), + update_view(MainDb2#db.name, <<"_design/foo">>, <<"foo">>), + ok = couch_db:close(MainDb2), + + % Assuming the view compaction takes more than 50ms to complete + ok = timer:sleep(50), + Writer3 = spawn_writer(DbWriter3#db.name), + etap:is(is_process_alive(Writer3), true, "Spawned writer 3"), + + etap:is(get_writer_status(Writer3), {error, all_dbs_active}, + "Writer 3 got {error, all_dbs_active} when opening his database"), + + etap:is(is_process_alive(Writer1), true, "Writer 1 still alive"), + etap:is(is_process_alive(Writer2), true, "Writer 2 still alive"), + etap:is(is_process_alive(Writer3), true, "Writer 3 still alive"), + + receive + {'DOWN', MonRef, process, CompactPid, normal} -> + etap:diag("View group compaction successful"), + ok; + {'DOWN', MonRef, process, CompactPid, _Reason} -> + etap:bail("Failure compacting view group") + end, + + ok = timer:sleep(2000), + + etap:is(writer_try_again(Writer3), ok, + "Told writer 3 to try open his database again"), + etap:is(get_writer_status(Writer3), ok, + "Writer 3 was able to open his database"), + + etap:is(is_process_alive(Writer1), true, "Writer 1 still alive"), + etap:is(is_process_alive(Writer2), true, "Writer 2 still alive"), + etap:is(is_process_alive(Writer3), true, "Writer 3 still alive"), + + etap:is(stop_writer(Writer1), ok, "Stopped writer 1"), + etap:is(stop_writer(Writer2), ok, "Stopped writer 2"), + etap:is(stop_writer(Writer3), ok, "Stopped writer 3"), + + delete_db(MainDb), + delete_db(DbWriter1), + delete_db(DbWriter2), + delete_db(DbWriter3). + + +create_main_db() -> + {ok, Db} = create_db(main_db_name()), + DDoc = couch_doc:from_json_obj({[ + {<<"_id">>, <<"_design/foo">>}, + {<<"language">>, <<"javascript">>}, + {<<"views">>, {[ + {<<"foo">>, {[ + {<<"map">>, <<"function(doc) { emit(doc._id, doc); }">>} + ]}}, + {<<"foo2">>, {[ + {<<"map">>, <<"function(doc) { emit(doc._id, doc); }">>} + ]}}, + {<<"foo3">>, {[ + {<<"map">>, <<"function(doc) { emit(doc._id, doc); }">>} + ]}}, + {<<"foo4">>, {[ + {<<"map">>, <<"function(doc) { emit(doc._id, doc); }">>} + ]}}, + {<<"foo5">>, {[ + {<<"map">>, <<"function(doc) { emit(doc._id, doc); }">>} + ]}} + ]}} + ]}), + {ok, _} = couch_db:update_doc(Db, DDoc, []), + ok = populate_main_db(Db, 1000, 20000), + update_view(Db#db.name, <<"_design/foo">>, <<"foo">>), + {ok, Db}. + + +populate_main_db(Db, BatchSize, N) when N > 0 -> + Docs = lists:map( + fun(_) -> + couch_doc:from_json_obj({[ + {<<"_id">>, couch_uuids:new()}, + {<<"value">>, base64:encode(crypto:rand_bytes(1000))} + ]}) + end, + lists:seq(1, BatchSize)), + {ok, _} = couch_db:update_docs(Db, Docs, []), + populate_main_db(Db, BatchSize, N - length(Docs)); +populate_main_db(_Db, _, _) -> + ok. + + +update_view(DbName, DDocName, ViewName) -> + % Use a dedicated process - we can't explicitly drop the #group ref counter + Pid = spawn(fun() -> + {ok, Db} = couch_db:open_int(DbName, []), + couch_view:get_map_view(Db, DDocName, ViewName, false), + ok = couch_db:close(Db) + end), + MonRef = erlang:monitor(process, Pid), + receive + {'DOWN', MonRef, process, Pid, normal} -> + etap:diag("View group updated"), + ok; + {'DOWN', MonRef, process, Pid, _Reason} -> + etap:bail("Failure updating view group") + end. + + +create_db(DbName) -> + {ok, Db} = couch_db:create( + DbName, + [{user_ctx, #user_ctx{roles = [<<"_admin">>]}}, overwrite]), + {ok, Db}. + + +delete_db(#db{name = DbName, main_pid = Pid}) -> + ok = couch_server:delete( + DbName, [{user_ctx, #user_ctx{roles = [<<"_admin">>]}}]), + MonRef = erlang:monitor(process, Pid), + receive + {'DOWN', MonRef, process, Pid, _Reason} -> + ok + after 30000 -> + etap:bail("Timeout deleting database") + end. + + +spawn_writer(DbName) -> + Parent = self(), + spawn(fun() -> + process_flag(priority, high), + writer_loop(DbName, Parent) + end). + + +get_writer_status(Writer) -> + Ref = make_ref(), + Writer ! {get_status, Ref}, + receive + {db_open, Ref} -> + ok; + {db_open_error, Error, Ref} -> + Error + after 5000 -> + timeout + end. + + +writer_try_again(Writer) -> + Ref = make_ref(), + Writer ! {try_again, Ref}, + receive + {ok, Ref} -> + ok + after 5000 -> + timeout + end. + + +stop_writer(Writer) -> + Ref = make_ref(), + Writer ! {stop, Ref}, + receive + {ok, Ref} -> + ok + after 5000 -> + etap:bail("Timeout stopping writer process") + end. + + +% Just keep the database open, no need to actually do something on it. +writer_loop(DbName, Parent) -> + case couch_db:open_int(DbName, []) of + {ok, Db} -> + writer_loop_1(Db, Parent); + Error -> + writer_loop_2(DbName, Parent, Error) + end. + +writer_loop_1(Db, Parent) -> + receive + {get_status, Ref} -> + Parent ! {db_open, Ref}, + writer_loop_1(Db, Parent); + {stop, Ref} -> + ok = couch_db:close(Db), + Parent ! {ok, Ref} + end. + +writer_loop_2(DbName, Parent, Error) -> + receive + {get_status, Ref} -> + Parent ! {db_open_error, Error, Ref}, + writer_loop_2(DbName, Parent, Error); + {try_again, Ref} -> + Parent ! {ok, Ref}, + writer_loop(DbName, Parent) + end. diff --git a/apps/couch/test/etap/210-os-proc-pool.t b/apps/couch/test/etap/210-os-proc-pool.t new file mode 100755 index 00000000..d80707e8 --- /dev/null +++ b/apps/couch/test/etap/210-os-proc-pool.t @@ -0,0 +1,163 @@ +#!/usr/bin/env escript +%% -*- erlang -*- +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +main(_) -> + test_util:init_code_path(), + + etap:plan(21), + case (catch test()) of + ok -> + etap:end_tests(); + Other -> + etap:diag(io_lib:format("Test died abnormally: ~p", [Other])), + etap:bail(Other) + end, + ok. + + +test() -> + couch_server_sup:start_link(test_util:config_files()), + couch_config:set("query_server_config", "os_process_limit", "3", false), + + test_pool_full(), + test_client_unexpected_exit(), + + couch_server_sup:stop(), + ok. + + +test_pool_full() -> + Client1 = spawn_client(), + Client2 = spawn_client(), + Client3 = spawn_client(), + + etap:diag("Check that we can spawn the max number of processes."), + etap:is(ping_client(Client1), ok, "Client 1 started ok."), + etap:is(ping_client(Client2), ok, "Client 2 started ok."), + etap:is(ping_client(Client3), ok, "Client 3 started ok."), + + Proc1 = get_client_proc(Client1, "1"), + Proc2 = get_client_proc(Client2, "2"), + Proc3 = get_client_proc(Client3, "3"), + etap:isnt(Proc1, Proc2, "Clients 1 and 2 got different procs."), + etap:isnt(Proc2, Proc3, "Clients 2 and 3 got different procs."), + etap:isnt(Proc1, Proc3, "Clients 1 and 3 got different procs."), + + etap:diag("Check that client 4 blocks waiting for a process."), + Client4 = spawn_client(), + etap:is(ping_client(Client4), timeout, "Client 4 blocked while waiting."), + + etap:diag("Check that stopping a client gives up its process."), + etap:is(stop_client(Client1), ok, "First client stopped."), + + etap:diag("And check that our blocked process has been unblocked."), + etap:is(ping_client(Client4), ok, "Client was unblocked."), + + Proc4 = get_client_proc(Client4, "4"), + etap:is(Proc4, Proc1, "Client 4 got proc that client 1 got before."), + + lists:map(fun(C) -> ok = stop_client(C) end, [Client2, Client3, Client4]). + + +test_client_unexpected_exit() -> + Client1 = spawn_client(), + Client2 = spawn_client(), + Client3 = spawn_client(), + + etap:diag("Check that up to os_process_limit clients started."), + etap:is(ping_client(Client1), ok, "Client 1 started ok."), + etap:is(ping_client(Client2), ok, "Client 2 started ok."), + etap:is(ping_client(Client3), ok, "Client 3 started ok."), + + Proc1 = get_client_proc(Client1, "1"), + Proc2 = get_client_proc(Client2, "2"), + Proc3 = get_client_proc(Client3, "3"), + etap:isnt(Proc1, Proc2, "Clients 1 and 2 got different procs."), + etap:isnt(Proc2, Proc3, "Clients 2 and 3 got different procs."), + etap:isnt(Proc1, Proc3, "Clients 1 and 3 got different procs."), + + etap:diag("Check that killing a client frees an os_process."), + etap:is(kill_client(Client1), ok, "Client 1 died all right."), + + etap:diag("Check that a new client is not blocked on boot."), + Client4 = spawn_client(), + etap:is(ping_client(Client4), ok, "New client booted without blocking."), + + Proc4 = get_client_proc(Client4, "4"), + etap:isnt(Proc4, Proc1, + "Client 4 got a proc different from the one client 1 got before."), + etap:isnt(Proc4, Proc2, "Client 4's proc different from client 2's proc."), + etap:isnt(Proc4, Proc3, "Client 4's proc different from client 3's proc."), + + lists:map(fun(C) -> ok = stop_client(C) end, [Client2, Client3, Client4]). + + +spawn_client() -> + Parent = self(), + Ref = make_ref(), + Pid = spawn(fun() -> + Proc = couch_query_servers:get_os_process(<<"javascript">>), + loop(Parent, Ref, Proc) + end), + {Pid, Ref}. + + +ping_client({Pid, Ref}) -> + Pid ! ping, + receive + {pong, Ref} -> ok + after 3000 -> timeout + end. + + +get_client_proc({Pid, Ref}, ClientName) -> + Pid ! get_proc, + receive + {proc, Ref, Proc} -> Proc + after 3000 -> + etap:bail("Timeout getting client " ++ ClientName ++ " proc.") + end. + + +stop_client({Pid, Ref}) -> + Pid ! stop, + receive + {stop, Ref} -> ok + after 3000 -> timeout + end. + + +kill_client({Pid, Ref}) -> + Pid ! die, + receive + {die, Ref} -> ok + after 3000 -> timeout + end. + + +loop(Parent, Ref, Proc) -> + receive + ping -> + Parent ! {pong, Ref}, + loop(Parent, Ref, Proc); + get_proc -> + Parent ! {proc, Ref, Proc}, + loop(Parent, Ref, Proc); + stop -> + couch_query_servers:ret_os_process(Proc), + Parent ! {stop, Ref}; + die -> + Parent ! {die, Ref}, + exit(some_error) + end. diff --git a/apps/couch/test/javascript/run.tpl b/apps/couch/test/javascript/run.tpl index c5abe6e7..1389a4f9 100644 --- a/apps/couch/test/javascript/run.tpl +++ b/apps/couch/test/javascript/run.tpl @@ -27,4 +27,4 @@ cat $SCRIPT_DIR/json2.js \ $SCRIPT_DIR/test/*.js \ $JS_TEST_DIR/couch_http.js \ $JS_TEST_DIR/cli_runner.js \ - | $COUCHJS - + | $COUCHJS --http - |