diff options
Diffstat (limited to 'apps/couch/src')
23 files changed, 423 insertions, 278 deletions
diff --git a/apps/couch/src/couch_btree.erl b/apps/couch/src/couch_btree.erl index 3f2e86d8..52fcaece 100644 --- a/apps/couch/src/couch_btree.erl +++ b/apps/couch/src/couch_btree.erl @@ -15,6 +15,7 @@ -export([open/2, open/3, query_modify/4, add/2, add_remove/3]). -export([fold/4, full_reduce/1, final_reduce/2, foldl/3, foldl/4]). -export([fold_reduce/4, lookup/2, get_state/1, set_options/2]). +-export([less/3]). -record(btree, {fd, @@ -109,9 +110,17 @@ full_reduce(#btree{root={_P, Red}, reduce=Reduce}) -> % wraps a 2 arity function with the proper 3 arity function convert_fun_arity(Fun) when is_function(Fun, 2) -> - fun(KV, _Reds, AccIn) -> Fun(KV, AccIn) end; + fun + (visit, KV, _Reds, AccIn) -> Fun(KV, AccIn); + (traverse, _K, _Red, AccIn) -> {ok, AccIn} + end; convert_fun_arity(Fun) when is_function(Fun, 3) -> - Fun. % Already arity 3 + fun + (visit, KV, Reds, AccIn) -> Fun(KV, Reds, AccIn); + (traverse, _K, _Red, AccIn) -> {ok, AccIn} + end; +convert_fun_arity(Fun) when is_function(Fun, 4) -> + Fun. % Already arity 4 make_key_in_end_range_function(Bt, fwd, Options) -> case couch_util:get_value(end_key_gt, Options) of @@ -614,12 +623,17 @@ stream_node(Bt, Reds, {Pointer, _Reds}, InRange, Dir, Fun, Acc) -> stream_kp_node(_Bt, _Reds, [], _InRange, _Dir, _Fun, Acc) -> {ok, Acc}; -stream_kp_node(Bt, Reds, [{_Key, {Pointer, Red}} | Rest], InRange, Dir, Fun, Acc) -> - case stream_node(Bt, Reds, {Pointer, Red}, InRange, Dir, Fun, Acc) of +stream_kp_node(Bt, Reds, [{Key, {Pointer, Red}} | Rest], InRange, Dir, Fun, Acc) -> + case Fun(traverse, Key, Red, Acc) of {ok, Acc2} -> - stream_kp_node(Bt, [Red | Reds], Rest, InRange, Dir, Fun, Acc2); - {stop, LastReds, Acc2} -> - {stop, LastReds, Acc2} + case stream_node(Bt, Reds, {Pointer, Red}, InRange, Dir, Fun, Acc2) of + {ok, Acc3} -> + stream_kp_node(Bt, [Red | Reds], Rest, InRange, Dir, Fun, Acc3); + {stop, LastReds, Acc3} -> + {stop, LastReds, Acc3} + end; + {skip, Acc2} -> + stream_kp_node(Bt, [Red | Reds], Rest, InRange, Dir, Fun, Acc2) end. drop_nodes(_Bt, Reds, _StartKey, []) -> @@ -680,7 +694,7 @@ stream_kv_node2(Bt, Reds, PrevKVs, [{K,V} | RestKVs], InRange, Dir, Fun, Acc) -> {stop, {PrevKVs, Reds}, Acc}; true -> AssembledKV = assemble(Bt, K, V), - case Fun(AssembledKV, {PrevKVs, Reds}, Acc) of + case Fun(visit, AssembledKV, {PrevKVs, Reds}, Acc) of {ok, Acc2} -> stream_kv_node2(Bt, Reds, [AssembledKV | PrevKVs], RestKVs, InRange, Dir, Fun, Acc2); {stop, Acc2} -> diff --git a/apps/couch/src/couch_changes.erl b/apps/couch/src/couch_changes.erl index 44d0ad46..4f2857b6 100644 --- a/apps/couch/src/couch_changes.erl +++ b/apps/couch/src/couch_changes.erl @@ -268,7 +268,6 @@ start_sending_changes(Callback, UserAcc, ResponseType) -> send_changes(Args, Callback, UserAcc, Db, StartSeq, Prepend) -> #changes_args{ - style = Style, include_docs = IncludeDocs, conflicts = Conflicts, limit = Limit, @@ -278,7 +277,6 @@ send_changes(Args, Callback, UserAcc, Db, StartSeq, Prepend) -> } = Args, couch_db:changes_since( Db, - Style, StartSeq, fun changes_enumerator/2, [{dir, Dir}], diff --git a/apps/couch/src/couch_db.erl b/apps/couch/src/couch_db.erl index 96c49886..c01b0a35 100644 --- a/apps/couch/src/couch_db.erl +++ b/apps/couch/src/couch_db.erl @@ -23,7 +23,7 @@ -export([increment_update_seq/1,get_purge_seq/1,purge_docs/2,get_last_purged/1]). -export([start_link/3,open_doc_int/3,ensure_full_commit/1,ensure_full_commit/2]). -export([set_security/2,get_security/1]). --export([changes_since/5,changes_since/6,read_doc/2,new_revid/1]). +-export([changes_since/4,changes_since/5,read_doc/2,new_revid/1]). -export([check_is_admin/1, check_is_reader/1, get_doc_count/1]). -export([reopen/1, make_doc/5]). @@ -293,8 +293,11 @@ get_design_docs(#db{name = <<"shards/", _/binary>> = ShardName}) -> Response end; get_design_docs(#db{id_tree=Btree}=Db) -> - {ok,_, Docs} = couch_btree:fold(Btree, - fun(#full_doc_info{id= <<"_design/",_/binary>>}=FullDocInfo, _Reds, AccDocs) -> + {ok, _, Docs} = couch_view:fold( + #view{btree=Btree}, + fun(#full_doc_info{deleted = true}, _Reds, AccDocs) -> + {ok, AccDocs}; + (#full_doc_info{id= <<"_design/",_/binary>>}=FullDocInfo, _Reds, AccDocs) -> {ok, Doc} = couch_db:open_doc_int(Db, FullDocInfo, []), {ok, [Doc | AccDocs]}; (_, _Reds, AccDocs) -> @@ -987,10 +990,10 @@ enum_docs_reduce_to_count(Reds) -> fun couch_db_updater:btree_by_id_reduce/2, Reds), Count. -changes_since(Db, Style, StartSeq, Fun, Acc) -> - changes_since(Db, Style, StartSeq, Fun, [], Acc). - -changes_since(Db, Style, StartSeq, Fun, Options, Acc) -> +changes_since(Db, StartSeq, Fun, Acc) -> + changes_since(Db, StartSeq, Fun, [], Acc). + +changes_since(Db, StartSeq, Fun, Options, Acc) -> Wrapper = fun(FullDocInfo, _Offset, Acc2) -> case FullDocInfo of #full_doc_info{} -> @@ -998,17 +1001,7 @@ changes_since(Db, Style, StartSeq, Fun, Options, Acc) -> #doc_info{} -> DocInfo = FullDocInfo end, - #doc_info{revs=Revs} = DocInfo, - DocInfo2 = - case Style of - main_only -> - DocInfo; - all_docs -> - % remove revs before the seq - DocInfo#doc_info{revs=[RevInfo || - #rev_info{seq=RevSeq}=RevInfo <- Revs, StartSeq < RevSeq]} - end, - Fun(DocInfo2, Acc2) + Fun(DocInfo, Acc2) end, {ok, _LastReduction, AccOut} = couch_btree:fold(Db#db.seq_tree, Wrapper, Acc, [{start_key, couch_util:to_integer(StartSeq) + 1} | Options]), @@ -1028,7 +1021,8 @@ enum_docs_since(Db, SinceSeq, InFun, Acc, Options) -> {ok, enum_docs_since_reduce_to_count(LastReduction), AccOut}. enum_docs(Db, InFun, InAcc, Options) -> - {ok, LastReduce, OutAcc} = couch_btree:fold(Db#db.id_tree, InFun, InAcc, Options), + {ok, LastReduce, OutAcc} = couch_view:fold( + #view{btree=Db#db.id_tree}, InFun, InAcc, Options), {ok, enum_docs_reduce_to_count(LastReduce), OutAcc}. %%% Internal function %%% diff --git a/apps/couch/src/couch_doc.erl b/apps/couch/src/couch_doc.erl index 33d7e3cf..63ac0892 100644 --- a/apps/couch/src/couch_doc.erl +++ b/apps/couch/src/couch_doc.erl @@ -302,12 +302,20 @@ to_doc_info(FullDocInfo) -> {DocInfo, _Path} = to_doc_info_path(FullDocInfo), DocInfo. -max_seq([], Max) -> - Max; -max_seq([#rev_info{seq=Seq}|Rest], Max) -> - max_seq(Rest, if Max > Seq -> Max; true -> Seq end). +max_seq(Tree, UpdateSeq) -> + FoldFun = fun({_Pos, _Key}, Value, _Type, MaxOldSeq) -> + case Value of + {_Deleted, _DiskPos, OldTreeSeq} -> + erlang:max(MaxOldSeq, OldTreeSeq); + #leaf{seq=LeafSeq} -> + erlang:max(MaxOldSeq, LeafSeq); + _ -> + MaxOldSeq + end + end, + couch_key_tree:fold(FoldFun, UpdateSeq, Tree). -to_doc_info_path(#full_doc_info{id=Id,rev_tree=Tree}) -> +to_doc_info_path(#full_doc_info{id=Id,rev_tree=Tree,update_seq=FDISeq}) -> RevInfosAndPath = [{#rev_info{deleted=Del,body_sp=Bp,seq=Seq,rev={Pos,RevId}}, Path} || {#leaf{deleted=Del, ptr=Bp, seq=Seq},{Pos, [RevId|_]}=Path} <- @@ -320,7 +328,7 @@ to_doc_info_path(#full_doc_info{id=Id,rev_tree=Tree}) -> end, RevInfosAndPath), [{_RevInfo, WinPath}|_] = SortedRevInfosAndPath, RevInfos = [RevInfo || {RevInfo, _Path} <- SortedRevInfosAndPath], - {#doc_info{id=Id, high_seq=max_seq(RevInfos, 0), revs=RevInfos}, WinPath}. + {#doc_info{id=Id, high_seq=max_seq(Tree, FDISeq), revs=RevInfos}, WinPath}. diff --git a/apps/couch/src/couch_httpd.erl b/apps/couch/src/couch_httpd.erl index 8fb2687c..602bdf2b 100644 --- a/apps/couch/src/couch_httpd.erl +++ b/apps/couch/src/couch_httpd.erl @@ -469,16 +469,24 @@ body_length(Req) -> Unknown -> {unknown_transfer_encoding, Unknown} end. -body(#httpd{mochi_req=MochiReq, req_body=ReqBody}) -> - case ReqBody of +body(#httpd{mochi_req=MochiReq, req_body=undefined} = Req) -> + case body_length(Req) of undefined -> - % Maximum size of document PUT request body (4GB) MaxSize = list_to_integer( couch_config:get("couchdb", "max_document_size", "4294967296")), MochiReq:recv_body(MaxSize); - _Else -> - ReqBody - end. + chunked -> + ChunkFun = fun({0, _Footers}, Acc) -> + lists:reverse(Acc); + ({_Len, Chunk}, Acc) -> + [Chunk | Acc] + end, + recv_chunked(Req, 8192, ChunkFun, []); + Len -> + MochiReq:recv_body(Len) + end; +body(#httpd{req_body=ReqBody}) -> + ReqBody. json_body(Httpd) -> ?JSON_DECODE(body(Httpd)). @@ -619,25 +627,25 @@ send_json(Req, Code, Value) -> send_json(Req, Code, [], Value). send_json(Req, Code, Headers, Value) -> + initialize_jsonp(Req), DefaultHeaders = [ {"Content-Type", negotiate_content_type(Req)}, {"Cache-Control", "must-revalidate"} ], - Body = [start_jsonp(Req), ?JSON_ENCODE(Value), end_jsonp(), $\n], + Body = [start_jsonp(), ?JSON_ENCODE(Value), end_jsonp(), $\n], send_response(Req, Code, DefaultHeaders ++ Headers, Body). start_json_response(Req, Code) -> start_json_response(Req, Code, []). start_json_response(Req, Code, Headers) -> + initialize_jsonp(Req), DefaultHeaders = [ {"Content-Type", negotiate_content_type(Req)}, {"Cache-Control", "must-revalidate"} ], - start_jsonp(Req), % Validate before starting chunked. - %start_chunked_response(Req, Code, DefaultHeaders ++ Headers). {ok, Resp} = start_chunked_response(Req, Code, DefaultHeaders ++ Headers), - case start_jsonp(Req) of + case start_jsonp() of [] -> ok; Start -> send_chunk(Resp, Start) end, @@ -647,7 +655,7 @@ end_json_response(Resp) -> send_chunk(Resp, end_jsonp() ++ [$\n]), last_chunk(Resp). -start_jsonp(Req) -> +initialize_jsonp(Req) -> case get(jsonp) of undefined -> put(jsonp, qs_value(Req, "callback", no_jsonp)); _ -> ok @@ -660,14 +668,9 @@ start_jsonp(Req) -> % make sure jsonp is configured on (default off) case couch_config:get("httpd", "allow_jsonp", "false") of "true" -> - validate_callback(CallBack), - CallBack ++ "("; + validate_callback(CallBack); _Else -> - % this could throw an error message, but instead we just ignore the - % jsonp parameter - % throw({bad_request, <<"JSONP must be configured before using.">>}) - put(jsonp, no_jsonp), - [] + put(jsonp, no_jsonp) end catch Error -> @@ -676,6 +679,13 @@ start_jsonp(Req) -> end end. +start_jsonp() -> + case get(jsonp) of + no_jsonp -> []; + [] -> []; + CallBack -> CallBack ++ "(" + end. + end_jsonp() -> Resp = case get(jsonp) of no_jsonp -> []; @@ -836,7 +846,14 @@ send_redirect(Req, Path) -> Headers = [{"Location", couch_httpd:absolute_uri(Req, Path)}], send_response(Req, 301, Headers, <<>>). -negotiate_content_type(#httpd{mochi_req=MochiReq}) -> +negotiate_content_type(Req) -> + case get(jsonp) of + no_jsonp -> negotiate_content_type1(Req); + [] -> negotiate_content_type1(Req); + _Callback -> "text/javascript" + end. + +negotiate_content_type1(#httpd{mochi_req=MochiReq}) -> %% Determine the appropriate Content-Type header for a JSON response %% depending on the Accept header in the request. A request that explicitly %% lists the correct JSON MIME type will get that type, otherwise the diff --git a/apps/couch/src/couch_httpd_db.erl b/apps/couch/src/couch_httpd_db.erl index 71204598..0bf97e26 100644 --- a/apps/couch/src/couch_httpd_db.erl +++ b/apps/couch/src/couch_httpd_db.erl @@ -128,7 +128,7 @@ handle_changes_req1(Req, Db) -> handle_compact_req(#httpd{method='POST',path_parts=[DbName,_,Id|_]}=Req, Db) -> ok = couch_db:check_is_admin(Db), couch_httpd:validate_ctype(Req, "application/json"), - ok = couch_view_compactor:start_compact(DbName, Id), + {ok, _} = couch_view_compactor:start_compact(DbName, Id), send_json(Req, 202, {[{ok, true}]}); handle_compact_req(#httpd{method='POST'}=Req, Db) -> @@ -477,6 +477,7 @@ db_req(#httpd{path_parts=[_, DocId | FileNameParts]}=Req, Db) -> db_attachment_req(Req, Db, DocId, FileNameParts). all_docs_view(Req, Db, Keys) -> + RawCollator = fun(A, B) -> A < B end, #view_query_args{ start_key = StartKey, start_docid = StartDocId, @@ -486,7 +487,8 @@ all_docs_view(Req, Db, Keys) -> skip = SkipCount, direction = Dir, inclusive_end = Inclusive - } = QueryArgs = couch_httpd_view:parse_view_params(Req, Keys, map), + } = QueryArgs + = couch_httpd_view:parse_view_params(Req, Keys, map, RawCollator), {ok, Info} = couch_db:get_db_info(Db), CurrentEtag = couch_httpd:make_etag(Info), couch_httpd:etag_respond(Req, CurrentEtag, fun() -> diff --git a/apps/couch/src/couch_httpd_show.erl b/apps/couch/src/couch_httpd_show.erl index 59f74e1c..58f046e4 100644 --- a/apps/couch/src/couch_httpd_show.erl +++ b/apps/couch/src/couch_httpd_show.erl @@ -106,13 +106,15 @@ get_fun_key(DDoc, Type, Name) -> % send_method_not_allowed(Req, "POST,PUT,DELETE,ETC"); handle_doc_update_req(#httpd{ - path_parts=[_, _, _, _, UpdateName, DocId] + path_parts=[_, _, _, _, UpdateName, DocId|Rest] }=Req, Db, DDoc) -> - Doc = try couch_httpd_db:couch_doc_open(Db, DocId, nil, [conflicts]) + DocParts = [DocId|Rest], + DocId1 = ?l2b(string:join([?b2l(P)|| P <- DocParts], "/")), + Doc = try couch_httpd_db:couch_doc_open(Db, DocId1, nil, [conflicts]) catch _ -> nil end, - send_doc_update_response(Req, Db, DDoc, UpdateName, Doc, DocId); + send_doc_update_response(Req, Db, DDoc, UpdateName, Doc, DocId1); handle_doc_update_req(#httpd{ path_parts=[_, _, _, _, UpdateName] @@ -125,7 +127,7 @@ handle_doc_update_req(Req, _Db, _DDoc) -> send_doc_update_response(Req, Db, DDoc, UpdateName, Doc, DocId) -> JsonReq = couch_httpd_external:json_req_obj(Req, Db, DocId), JsonDoc = couch_query_servers:json_doc(Doc), - {Code, JsonResp1} = case couch_query_servers:ddoc_prompt(DDoc, + JsonResp1 = case couch_query_servers:ddoc_prompt(DDoc, [<<"updates">>, UpdateName], [JsonDoc, JsonReq]) of [<<"up">>, {NewJsonDoc}, {JsonResp}] -> Options = case couch_httpd:header_value(Req, "X-Couch-Full-Commit", @@ -138,16 +140,14 @@ send_doc_update_response(Req, Db, DDoc, UpdateName, Doc, DocId) -> NewDoc = couch_doc:from_json_obj({NewJsonDoc}), {ok, NewRev} = couch_db:update_doc(Db, NewDoc, Options), NewRevStr = couch_doc:rev_to_str(NewRev), - JsonRespWithRev = {[{<<"headers">>, - {[{<<"X-Couch-Update-NewRev">>, NewRevStr}]}} | JsonResp]}, - {201, JsonRespWithRev}; - [<<"up">>, _Other, JsonResp] -> - {200, JsonResp} + {[{<<"code">>, 201}, {<<"headers">>, + {[{<<"X-Couch-Update-NewRev">>, NewRevStr}]}} | JsonResp]}; + [<<"up">>, _Other, {JsonResp}] -> + {[{<<"code">>, 200} | JsonResp]} end, - - JsonResp2 = couch_util:json_apply_field({<<"code">>, Code}, JsonResp1), + % todo set location field - couch_httpd_external:send_external_response(Req, JsonResp2). + couch_httpd_external:send_external_response(Req, JsonResp1). % view-list request with view and list from same design doc. @@ -190,14 +190,14 @@ handle_view_list_req(Req, _Db, _DDoc) -> handle_view_list(Req, Db, DDoc, LName, {ViewDesignName, ViewName}, Keys) -> ViewDesignId = <<"_design/", ViewDesignName/binary>>, {ViewType, View, Group, QueryArgs} = couch_httpd_view:load_view(Req, Db, {ViewDesignId, ViewName}, Keys), - Etag = list_etag(Req, Db, Group, View, {couch_httpd:doc_etag(DDoc), Keys}), + Etag = list_etag(Req, Db, Group, View, QueryArgs, {couch_httpd:doc_etag(DDoc), Keys}), couch_httpd:etag_respond(Req, Etag, fun() -> output_list(ViewType, Req, Db, DDoc, LName, View, QueryArgs, Etag, Keys, Group) end). -list_etag(#httpd{user_ctx=UserCtx}=Req, Db, Group, View, More) -> +list_etag(#httpd{user_ctx=UserCtx}=Req, Db, Group, View, QueryArgs, More) -> Accept = couch_httpd:header_value(Req, "Accept"), - couch_httpd_view:view_etag(Db, Group, View, {More, Accept, UserCtx#user_ctx.roles}). + couch_httpd_view:view_etag(Db, Group, View, QueryArgs, {More, Accept, UserCtx#user_ctx.roles}). output_list(map, Req, Db, DDoc, LName, View, QueryArgs, Etag, Keys, Group) -> output_map_list(Req, Db, DDoc, LName, View, QueryArgs, Etag, Keys, Group); diff --git a/apps/couch/src/couch_httpd_vhost.erl b/apps/couch/src/couch_httpd_vhost.erl index 9bfb5951..03dd02ae 100644 --- a/apps/couch/src/couch_httpd_vhost.erl +++ b/apps/couch/src/couch_httpd_vhost.erl @@ -216,15 +216,19 @@ code_change(_OldVsn, State, _Extra) -> {ok, State}. +append_path("/"=_Target, "/"=_Path) -> + "/"; +append_path(Target, Path) -> + Target ++ Path. % default redirect vhost handler redirect_to_vhost(MochiReq, VhostTarget) -> Path = MochiReq:get(raw_path), - Target = VhostTarget ++ Path, + Target = append_path(VhostTarget, Path), ?LOG_DEBUG("Vhost Target: '~p'~n", [Target]), - + Headers = mochiweb_headers:enter("x-couchdb-vhost-path", Path, MochiReq:get(headers)), @@ -356,8 +360,8 @@ split_host_port(HostAsString) -> {split_host(HostAsString), '*'}; N -> HostPart = string:substr(HostAsString, 1, N-1), - case (catch erlang:list_to_integer(HostAsString, N+1, - length(HostAsString))) of + case (catch erlang:list_to_integer(string:substr(HostAsString, + N+1, length(HostAsString)))) of {'EXIT', _} -> {split_host(HostAsString), '*'}; Port -> diff --git a/apps/couch/src/couch_httpd_view.erl b/apps/couch/src/couch_httpd_view.erl index b71fc2c6..082a5039 100644 --- a/apps/couch/src/couch_httpd_view.erl +++ b/apps/couch/src/couch_httpd_view.erl @@ -15,9 +15,9 @@ -export([handle_view_req/3,handle_temp_view_req/2]). --export([parse_view_params/3]). +-export([parse_view_params/4]). -export([make_view_fold_fun/7, finish_view_fold/4, finish_view_fold/5, view_row_obj/4]). --export([view_etag/3, view_etag/4, make_reduce_fold_funs/6]). +-export([view_etag/5, make_reduce_fold_funs/6]). -export([design_doc_view/5, parse_bool_param/1, doc_member/3]). -export([make_key_options/1, load_view/4]). @@ -34,18 +34,19 @@ design_doc_view(Req, Db, DName, ViewName, Keys) -> Reduce = get_reduce_type(Req), Result = case couch_view:get_map_view(Db, DesignId, ViewName, Stale) of {ok, View, Group} -> - QueryArgs = parse_view_params(Req, Keys, map), + QueryArgs = parse_view_params(Req, Keys, map, view_collator(View)), output_map_view(Req, View, Group, Db, QueryArgs, Keys); {not_found, Reason} -> case couch_view:get_reduce_view(Db, DesignId, ViewName, Stale) of {ok, ReduceView, Group} -> + Collator = view_collator(ReduceView), case Reduce of false -> - QueryArgs = parse_view_params(Req, Keys, red_map), + QueryArgs = parse_view_params(Req, Keys, red_map, Collator), MapView = couch_view:extract_map_view(ReduceView), output_map_view(Req, MapView, Group, Db, QueryArgs, Keys); _ -> - QueryArgs = parse_view_params(Req, Keys, reduce), + QueryArgs = parse_view_params(Req, Keys, reduce, Collator), output_reduce_view(Req, Db, ReduceView, Group, QueryArgs, Keys) end; _ -> @@ -90,19 +91,19 @@ handle_temp_view_req(#httpd{method='POST'}=Req, Db) -> Reduce = get_reduce_type(Req), case couch_util:get_value(<<"reduce">>, Props, null) of null -> - QueryArgs = parse_view_params(Req, Keys, map), {ok, View, Group} = couch_view:get_temp_map_view(Db, Language, DesignOptions, MapSrc), + QueryArgs = parse_view_params(Req, Keys, map, view_collator(View)), output_map_view(Req, View, Group, Db, QueryArgs, Keys); _ when Reduce =:= false -> - QueryArgs = parse_view_params(Req, Keys, red_map), {ok, View, Group} = couch_view:get_temp_map_view(Db, Language, DesignOptions, MapSrc), + QueryArgs = parse_view_params(Req, Keys, red_map, view_collator(View)), output_map_view(Req, View, Group, Db, QueryArgs, Keys); RedSrc -> - QueryArgs = parse_view_params(Req, Keys, reduce), {ok, View, Group} = couch_view:get_temp_reduce_view(Db, Language, DesignOptions, MapSrc, RedSrc), + QueryArgs = parse_view_params(Req, Keys, reduce, view_collator(View)), output_reduce_view(Req, Db, View, Group, QueryArgs, Keys) end; @@ -114,7 +115,7 @@ output_map_view(Req, View, Group, Db, QueryArgs, nil) -> limit = Limit, skip = SkipCount } = QueryArgs, - CurrentEtag = view_etag(Db, Group, View), + CurrentEtag = view_etag(Db, Group, View, QueryArgs), couch_httpd:etag_respond(Req, CurrentEtag, fun() -> {ok, RowCount} = couch_view:get_row_count(View), FoldlFun = make_view_fold_fun(Req, QueryArgs, CurrentEtag, Db, Group#group.current_seq, RowCount, #view_fold_helper_funs{reduce_count=fun couch_view:reduce_to_count/1}), @@ -130,7 +131,7 @@ output_map_view(Req, View, Group, Db, QueryArgs, Keys) -> limit = Limit, skip = SkipCount } = QueryArgs, - CurrentEtag = view_etag(Db, Group, View, Keys), + CurrentEtag = view_etag(Db, Group, View, QueryArgs, Keys), couch_httpd:etag_respond(Req, CurrentEtag, fun() -> {ok, RowCount} = couch_view:get_row_count(View), FoldAccInit = {Limit, SkipCount, undefined, []}, @@ -155,7 +156,7 @@ output_reduce_view(Req, Db, View, Group, QueryArgs, nil) -> skip = Skip, group_level = GroupLevel } = QueryArgs, - CurrentEtag = view_etag(Db, Group, View), + CurrentEtag = view_etag(Db, Group, View, QueryArgs), couch_httpd:etag_respond(Req, CurrentEtag, fun() -> {ok, GroupRowsFun, RespFun} = make_reduce_fold_funs(Req, GroupLevel, QueryArgs, CurrentEtag, Group#group.current_seq, @@ -173,7 +174,7 @@ output_reduce_view(Req, Db, View, Group, QueryArgs, Keys) -> skip = Skip, group_level = GroupLevel } = QueryArgs, - CurrentEtag = view_etag(Db, Group, View, Keys), + CurrentEtag = view_etag(Db, Group, View, QueryArgs, Keys), couch_httpd:etag_respond(Req, CurrentEtag, fun() -> {ok, GroupRowsFun, RespFun} = make_reduce_fold_funs(Req, GroupLevel, QueryArgs, CurrentEtag, Group#group.current_seq, @@ -209,18 +210,19 @@ load_view(Req, Db, {ViewDesignId, ViewName}, Keys) -> Reduce = get_reduce_type(Req), case couch_view:get_map_view(Db, ViewDesignId, ViewName, Stale) of {ok, View, Group} -> - QueryArgs = parse_view_params(Req, Keys, map), + QueryArgs = parse_view_params(Req, Keys, map, view_collator(View)), {map, View, Group, QueryArgs}; {not_found, _Reason} -> case couch_view:get_reduce_view(Db, ViewDesignId, ViewName, Stale) of {ok, ReduceView, Group} -> + Collator = view_collator(ReduceView), case Reduce of false -> - QueryArgs = parse_view_params(Req, Keys, map_red), + QueryArgs = parse_view_params(Req, Keys, map_red, Collator), MapView = couch_view:extract_map_view(ReduceView), {map, MapView, Group, QueryArgs}; _ -> - QueryArgs = parse_view_params(Req, Keys, reduce), + QueryArgs = parse_view_params(Req, Keys, reduce, Collator), {reduce, ReduceView, Group, QueryArgs} end; {not_found, Reason} -> @@ -228,12 +230,30 @@ load_view(Req, Db, {ViewDesignId, ViewName}, Keys) -> end end. +view_collator({reduce, _N, _Lang, View}) -> + view_collator(View); + +view_collator({temp_reduce, View}) -> + view_collator(View); + +view_collator(#view{btree=Btree}) -> + % Return an "is-less-than" predicate by calling into the btree's + % collator. For raw collation, couch_btree compares arbitrary + % Erlang terms, but for normal (ICU) collation, it expects + % {Json, Id} tuples. + fun + ({_JsonA, _IdA}=A, {_JsonB, _IdB}=B) -> + couch_btree:less(Btree, A, B); + (JsonA, JsonB) -> + couch_btree:less(Btree, {JsonA, null}, {JsonB, null}) + end. + % query_parse_error could be removed % we wouldn't need to pass the view type, it'd just parse params. % I'm not sure what to do about the error handling, but % it might simplify things to have a parse_view_params function % that doesn't throw(). -parse_view_params(Req, Keys, ViewType) -> +parse_view_params(Req, Keys, ViewType, LessThan) -> QueryList = couch_httpd:qs(Req), QueryParams = lists:foldl(fun({K, V}, Acc) -> @@ -247,7 +267,7 @@ parse_view_params(Req, Keys, ViewType) -> QueryArgs = lists:foldl(fun({K, V}, Args2) -> validate_view_query(K, V, Args2) end, Args, lists:reverse(QueryParams)), % Reverse to match QS order. - warn_on_empty_key_range(QueryArgs), + warn_on_empty_key_range(QueryArgs, LessThan), GroupLevel = QueryArgs#view_query_args.group_level, case {ViewType, GroupLevel, IsMultiGet} of {reduce, exact, true} -> @@ -328,15 +348,15 @@ parse_view_param("callback", _) -> parse_view_param(Key, Value) -> [{extra, {Key, Value}}]. -warn_on_empty_key_range(#view_query_args{start_key=undefined}) -> +warn_on_empty_key_range(#view_query_args{start_key=undefined}, _Lt) -> ok; -warn_on_empty_key_range(#view_query_args{end_key=undefined}) -> +warn_on_empty_key_range(#view_query_args{end_key=undefined}, _Lt) -> ok; -warn_on_empty_key_range(#view_query_args{start_key=A, end_key=A}) -> +warn_on_empty_key_range(#view_query_args{start_key=A, end_key=A}, _Lt) -> ok; warn_on_empty_key_range(#view_query_args{ - start_key=StartKey, end_key=EndKey, direction=Dir}) -> - case {Dir, couch_view:less_json(StartKey, EndKey)} of + start_key=StartKey, end_key=EndKey, direction=Dir}, LessThan) -> + case {Dir, LessThan(StartKey, EndKey)} of {fwd, false} -> throw({query_parse_error, <<"No rows can match your key range, reverse your ", @@ -640,14 +660,16 @@ send_json_reduce_row(Resp, {Key, Value}, RowFront) -> send_chunk(Resp, RowFront ++ ?JSON_ENCODE({[{key, Key}, {value, Value}]})), {ok, ",\r\n"}. -view_etag(Db, Group, View) -> - view_etag(Db, Group, View, nil). +view_etag(Db, Group, View, QueryArgs) -> + view_etag(Db, Group, View, QueryArgs, nil). -view_etag(Db, Group, {reduce, _, _, View}, Extra) -> - view_etag(Db, Group, View, Extra); -view_etag(Db, Group, {temp_reduce, View}, Extra) -> - view_etag(Db, Group, View, Extra); -view_etag(_Db, #group{sig=Sig}, #view{update_seq=UpdateSeq, purge_seq=PurgeSeq}, Extra) -> +view_etag(Db, Group, {reduce, _, _, View}, QueryArgs, Extra) -> + view_etag(Db, Group, View, QueryArgs, Extra); +view_etag(Db, Group, {temp_reduce, View}, QueryArgs, Extra) -> + view_etag(Db, Group, View, QueryArgs, Extra); +view_etag(_Db, #group{sig=Sig, current_seq=CurrentSeq}, _View, #view_query_args{include_docs=true}, Extra) -> + couch_httpd:make_etag({Sig, CurrentSeq, Extra}); +view_etag(_Db, #group{sig=Sig}, #view{update_seq=UpdateSeq, purge_seq=PurgeSeq}, _QueryArgs, Extra) -> couch_httpd:make_etag({Sig, UpdateSeq, PurgeSeq, Extra}). % the view row has an error diff --git a/apps/couch/src/couch_key_tree.erl b/apps/couch/src/couch_key_tree.erl index 2f3c6abf..5e24e0f7 100644 --- a/apps/couch/src/couch_key_tree.erl +++ b/apps/couch/src/couch_key_tree.erl @@ -50,7 +50,7 @@ -export([merge/3, find_missing/2, get_key_leafs/2, get_full_key_paths/2, get/2, compute_data_size/1]). -export([map/2, get_all_leafs/1, count_leafs/1, remove_leafs/2, - get_all_leafs_full/1,stem/2,map_leafs/2]). + get_all_leafs_full/1,stem/2,map_leafs/2, fold/3]). -include("couch_db.hrl"). @@ -373,6 +373,21 @@ tree_fold_simple(Fun, Pos, [{Key, Value, SubTree} | RestTree], Acc) -> Acc2 end. +fold(_Fun, Acc, []) -> + Acc; +fold(Fun, Acc0, [{Pos, Tree}|Rest]) -> + Acc1 = fold_simple(Fun, Acc0, Pos, [Tree]), + fold(Fun, Acc1, Rest). + +fold_simple(_Fun, Acc, _Pos, []) -> + Acc; +fold_simple(Fun, Acc0, Pos, [{Key, Value, SubTree} | RestTree]) -> + Type = if SubTree == [] -> leaf; true -> branch end, + Acc1 = Fun({Pos, Key}, Value, Type, Acc0), + Acc2 = fold_simple(Fun, Acc1, Pos+1, SubTree), + fold_simple(Fun, Acc2, Pos, RestTree). + + map(_Fun, []) -> []; map(Fun, [{Pos, Tree}|Rest]) -> diff --git a/apps/couch/src/couch_log.erl b/apps/couch/src/couch_log.erl index 9bac7450..362d092d 100644 --- a/apps/couch/src/couch_log.erl +++ b/apps/couch/src/couch_log.erl @@ -25,22 +25,12 @@ -define(LEVEL_TMI, 0). debug(Format, Args) -> - case debug_on() of - false -> - ok; - true -> - {ConsoleMsg, FileMsg} = get_log_messages(self(), debug, Format, Args), - gen_event:sync_notify(error_logger, {couch_debug, ConsoleMsg, FileMsg}) - end. + {ConsoleMsg, FileMsg} = get_log_messages(self(), debug, Format, Args), + gen_event:sync_notify(error_logger, {couch_debug, ConsoleMsg, FileMsg}). info(Format, Args) -> - case info_on() of - false -> - ok; - true -> - {ConsoleMsg, FileMsg} = get_log_messages(self(), info, Format, Args), - gen_event:sync_notify(error_logger, {couch_info, ConsoleMsg, FileMsg}) - end. + {ConsoleMsg, FileMsg} = get_log_messages(self(), info, Format, Args), + gen_event:sync_notify(error_logger, {couch_info, ConsoleMsg, FileMsg}). error(Format, Args) -> {ConsoleMsg, FileMsg} = get_log_messages(self(), error, Format, Args), @@ -180,6 +170,15 @@ get_log_messages(Pid, Level, Format, Args) -> read(Bytes, Offset) -> LogFileName = couch_config:get("log", "file"), LogFileSize = filelib:file_size(LogFileName), + MaxChunkSize = list_to_integer( + couch_config:get("httpd", "log_max_chunk_size", "1000000")), + case Bytes > MaxChunkSize of + true -> + throw({bad_request, "'bytes' cannot exceed " ++ + integer_to_list(MaxChunkSize)}); + false -> + ok + end, {ok, Fd} = file:open(LogFileName, [read]), Start = lists:max([LogFileSize - Bytes, 0]) + Offset, @@ -188,4 +187,5 @@ read(Bytes, Offset) -> % TODO: make streaming {ok, Chunk} = file:pread(Fd, Start, LogFileSize), + ok = file:close(Fd), Chunk. diff --git a/apps/couch/src/couch_os_process.erl b/apps/couch/src/couch_os_process.erl index 7fe8aa89..2a6d92a7 100644 --- a/apps/couch/src/couch_os_process.erl +++ b/apps/couch/src/couch_os_process.erl @@ -173,10 +173,7 @@ handle_info({Port, {exit_status, 0}}, #os_proc{port=Port}=OsProc) -> {stop, normal, OsProc}; handle_info({Port, {exit_status, Status}}, #os_proc{port=Port}=OsProc) -> ?LOG_ERROR("OS Process died with status: ~p", [Status]), - {stop, {exit_status, Status}, OsProc}; -handle_info(Msg, OsProc) -> - ?LOG_DEBUG("OS Proc: Unknown info: ~p", [Msg]), - {noreply, OsProc}. + {stop, {exit_status, Status}, OsProc}. code_change(_OldVsn, State, _Extra) -> {ok, State}. diff --git a/apps/couch/src/couch_query_servers.erl b/apps/couch/src/couch_query_servers.erl index be7c465b..9714a0ca 100644 --- a/apps/couch/src/couch_query_servers.erl +++ b/apps/couch/src/couch_query_servers.erl @@ -17,9 +17,9 @@ -export([filter_docs/5]). -export([with_ddoc_proc/2, proc_prompt/2, ddoc_prompt/3, ddoc_proc_prompt/3, json_doc/1]). --export([get_os_process/1, ret_os_process/1]). -% -export([test/0]). +% For 210-os-proc-pool.t +-export([get_os_process/1, ret_os_process/1]). -include("couch_db.hrl"). diff --git a/apps/couch/src/couch_rep.erl b/apps/couch/src/couch_rep.erl index 2d011aab..46bcb282 100644 --- a/apps/couch/src/couch_rep.erl +++ b/apps/couch/src/couch_rep.erl @@ -20,6 +20,7 @@ -export([start_replication/4, end_replication/1, get_result/4]). -include("couch_db.hrl"). +-include("couch_js_functions.hrl"). -include_lib("ibrowse/include/ibrowse.hrl"). -define(REP_ID_VERSION, 2). @@ -110,20 +111,20 @@ checkpoint(Server) -> gen_server:cast(Server, do_checkpoint). get_result(Server, {BaseId, _Extension}, {Props} = PostBody, UserCtx) -> - case couch_util:get_value(<<"continuous">>, Props, false) of - true -> - {ok, {continuous, ?l2b(BaseId)}}; - false -> - try gen_server:call(Server, get_result, infinity) of - retry -> replicate(PostBody, UserCtx); - Else -> Else - catch - exit:{noproc, {gen_server, call, [Server, get_result , infinity]}} -> - %% oops, this replication just finished -- restart it. - replicate(PostBody, UserCtx); - exit:{normal, {gen_server, call, [Server, get_result , infinity]}} -> - %% we made the call during terminate - replicate(PostBody, UserCtx) + case couch_util:get_value(<<"continuous">>, Props, false) of + true -> + {ok, {continuous, ?l2b(BaseId)}}; + false -> + try gen_server:call(Server, get_result, infinity) of + retry -> replicate(PostBody, UserCtx); + Else -> Else + catch + exit:{noproc, {gen_server, call, [Server, get_result, infinity]}} -> + %% oops, this replication just finished -- restart it. + replicate(PostBody, UserCtx); + exit:{normal, {gen_server, call, [Server, get_result, infinity]}} -> + %% we made the call during terminate + replicate(PostBody, UserCtx) end end. @@ -154,13 +155,13 @@ do_init([{BaseId, _Ext} = RepId, {PostProps}, UserCtx, Module] = InitArgs) -> [SourceLog, TargetLog] = find_replication_logs( [Source, Target], BaseId, {PostProps}, UserCtx), - {StartSeq, History} = compare_replication_logs(SourceLog, TargetLog), + {StartSeq, History} = compare_replication_logs(SourceLog, TargetLog), - {ok, ChangesFeed} = + {ok, ChangesFeed} = couch_rep_changes_feed:start_link(self(), Source, StartSeq, PostProps), - {ok, MissingRevs} = + {ok, MissingRevs} = couch_rep_missing_revs:start_link(self(), Target, ChangesFeed, PostProps), - {ok, Reader} = + {ok, Reader} = couch_rep_reader:start_link(self(), Source, MissingRevs, PostProps), {ok, Writer} = couch_rep_writer:start_link(self(), Target, Reader, PostProps), @@ -545,7 +546,7 @@ filter_code(Filter, Props, UserCtx) -> DocErrorMsg = io_lib:format( "Couldn't open document `_design/~s` from source " "database `~s`: ~s", - [dbname(Source), DDocName, couch_util:to_binary(DocError)]), + [DDocName, dbname(Source), couch_util:to_binary(DocError)]), throw({error, iolist_to_binary(DocErrorMsg)}) end, Code = couch_util:get_nested_json_value( @@ -649,18 +650,18 @@ open_db(<<"https://",_/binary>>=Url, _, ProxyParams, CreateTarget) -> open_db({[{<<"url">>,Url}]}, [], ProxyParams, CreateTarget); open_db(<<DbName/binary>>, UserCtx, _ProxyParams, CreateTarget) -> try - case CreateTarget of - true -> - ok = couch_httpd:verify_is_server_admin(UserCtx), - couch_server:create(DbName, [{user_ctx, UserCtx}]); + case CreateTarget of + true -> + ok = couch_httpd:verify_is_server_admin(UserCtx), + couch_server:create(DbName, [{user_ctx, UserCtx}]); false -> ok - end, + end, - case couch_db:open(DbName, [{user_ctx, UserCtx}]) of - {ok, Db} -> - couch_db:monitor(Db), - Db; + case couch_db:open(DbName, [{user_ctx, UserCtx}]) of + {ok, Db} -> + couch_db:monitor(Db), + Db; {not_found, no_db_file} -> throw({db_not_found, DbName}) end diff --git a/apps/couch/src/couch_rep_changes_feed.erl b/apps/couch/src/couch_rep_changes_feed.erl index 36fe82aa..7a9573d6 100644 --- a/apps/couch/src/couch_rep_changes_feed.erl +++ b/apps/couch/src/couch_rep_changes_feed.erl @@ -154,7 +154,7 @@ init([Parent, #http_db{headers = Headers0} = Source, Since, PostProps]) -> end; {ibrowse_async_headers, ReqId, Code, _} -> {stop, {changes_error_code, list_to_integer(Code)}} - after 10000 -> + after 30000 -> {stop, changes_timeout} end; @@ -491,13 +491,30 @@ purge_req_messages(ReqId) -> ok end. -queue_changes_row(Row, #state{doc_ids = nil, count = Count, rows = Rows}) -> - {queue:in(Row, Rows), Count + 1}; +queue_changes_row(Row, #state{doc_ids = nil} = State) -> + maybe_queue_row(Row, State); queue_changes_row({RowProps} = Row, - #state{doc_ids = Ids, count = Count, rows = Rows}) -> + #state{doc_ids = Ids, count = Count, rows = Rows} = State) -> case lists:member(get_value(<<"id">>, RowProps), Ids) of true -> - {queue:in(Row, Rows), Count + 1}; + maybe_queue_row(Row, State); false -> {Rows, Count} end. + +maybe_queue_row({Props} = Row, #state{count = Count, rows = Rows} = State) -> + case get_value(<<"id">>, Props) of + <<>> -> + [_, Db | _] = State#state.init_args, + ?LOG_ERROR("Replicator: ignoring document with empty ID in source " + "database `~s` (_changes sequence ~p)", + [dbname(Db), couch_util:get_value(<<"seq">>, Props)]), + {Rows, Count}; + _ -> + {queue:in(Row, Rows), Count + 1} + end. + +dbname(#http_db{url = Url}) -> + couch_util:url_strip_password(Url); +dbname(#db{name = Name}) -> + Name. diff --git a/apps/couch/src/couch_rep_reader.erl b/apps/couch/src/couch_rep_reader.erl index 0d344e5c..1e8ca074 100644 --- a/apps/couch/src/couch_rep_reader.erl +++ b/apps/couch/src/couch_rep_reader.erl @@ -244,7 +244,7 @@ reader_loop(ReaderServer, Parent, Source, MissingRevsServer) -> case couch_rep_missing_revs:next(MissingRevsServer) of complete -> exit(complete); - {HighSeq, IdsRevs} -> + {_HighSeq, IdsRevs} -> % to be safe, make sure Results are sorted by source_seq SortedIdsRevs = lists:keysort(2, IdsRevs), RequestSeqs = [S || {_,S,_} <- SortedIdsRevs], @@ -255,8 +255,8 @@ reader_loop(ReaderServer, Parent, Source, MissingRevsServer) -> infinity) || {Id,Seq,Revs} <- SortedIdsRevs], reader_loop(ReaderServer, Parent, Source, MissingRevsServer); _Local -> - {ok, Source1} = gen_server:call(Parent, get_source_db, infinity), - Source2 = maybe_reopen_db(Source1, HighSeq), + {ok, Source2} = couch_db:open( + Source#db.name, [{user_ctx, Source#db.user_ctx}]), lists:foreach(fun({Id,Seq,Revs}) -> {ok, Docs} = couch_db:open_doc_revs(Source2, Id, Revs, [latest]), JustTheDocs = [Doc || {ok, Doc} <- Docs], @@ -268,12 +268,6 @@ reader_loop(ReaderServer, Parent, Source, MissingRevsServer) -> end end. -maybe_reopen_db(#db{update_seq=OldSeq} = Db, HighSeq) when HighSeq > OldSeq -> - {ok, NewDb} = couch_db:open(Db#db.name, [{user_ctx, Db#db.user_ctx}]), - NewDb; -maybe_reopen_db(Db, _HighSeq) -> - Db. - spawn_document_request(Source, Id, Seq, Revs) -> Server = self(), SpawnFun = fun() -> diff --git a/apps/couch/src/couch_rep_writer.erl b/apps/couch/src/couch_rep_writer.erl index 2b722e8e..40323925 100644 --- a/apps/couch/src/couch_rep_writer.erl +++ b/apps/couch/src/couch_rep_writer.erl @@ -26,7 +26,8 @@ writer_loop(Parent, Reader) -> ok; {HighSeq, Docs} -> DocCount = length(Docs), - {ok, Target} = gen_server:call(Parent, get_target_db, infinity), + {ok, Target0} = gen_server:call(Parent, get_target_db, infinity), + Target = open_db(Target0), try write_docs(Target, Docs) of {ok, []} -> Parent ! {update_stats, docs_written, DocCount}; @@ -38,6 +39,8 @@ writer_loop(Parent, Reader) -> {attachment_request_failed, Err} -> ?LOG_DEBUG("writer failed to write an attachment ~p", [Err]), exit({attachment_request_failed, Err, Docs}) + after + close_db(Target) end, Parent ! {writer_checkpoint, HighSeq}, couch_rep_att:cleanup(), @@ -163,3 +166,14 @@ write_docs_1({Props}) -> ErrId = couch_util:to_existing_atom(couch_util:get_value(<<"error">>, Props)), Reason = couch_util:get_value(<<"reason">>, Props), {{Id, Rev}, {ErrId, Reason}}. + +open_db(#db{name = Name, user_ctx = UserCtx}) -> + {ok, Db} = couch_db:open(Name, [{user_ctx, UserCtx}]), + Db; +open_db(HttpDb) -> + HttpDb. + +close_db(#db{} = Db) -> + couch_db:close(Db); +close_db(_HttpDb) -> + ok. diff --git a/apps/couch/src/couch_replication_manager.erl b/apps/couch/src/couch_replication_manager.erl index 3f7cc27c..3715cea1 100644 --- a/apps/couch/src/couch_replication_manager.erl +++ b/apps/couch/src/couch_replication_manager.erl @@ -317,7 +317,7 @@ process_update(State, {Change}) -> <<"error">> -> case ets:lookup(?DOC_TO_REP, DocId) of [] -> - maybe_start_replication(State, DocId, JsonRepDoc); + maybe_start_replication(State, DocId, JsonRepDoc); _ -> State end diff --git a/apps/couch/src/couch_view.erl b/apps/couch/src/couch_view.erl index 05174245..8d479d7e 100644 --- a/apps/couch/src/couch_view.erl +++ b/apps/couch/src/couch_view.erl @@ -260,8 +260,16 @@ fold_fun(Fun, [KV|Rest], {KVReds, Reds}, Acc) -> fold(#view{btree=Btree}, Fun, Acc, Options) -> WrapperFun = - fun(KV, Reds, Acc2) -> - fold_fun(Fun, expand_dups([KV],[]), Reds, Acc2) + fun(visit, KV, Reds, Acc2) -> + fold_fun(Fun, expand_dups([KV],[]), Reds, Acc2); + (traverse, LK, Red, Acc2) + when is_function(Fun, 4) -> + Fun(traverse, LK, Red, Acc2); + (traverse, _LK, Red, {_, Skip, _, _} = Acc2) + when Skip >= element(1, Red) -> + {skip, setelement(2, Acc2, Skip - element(1, Red))}; + (traverse, _, _, Acc2) -> + {ok, Acc2} end, {ok, _LastReduce, _AccResult} = couch_btree:fold(Btree, WrapperFun, Acc, Options). diff --git a/apps/couch/src/couch_view_compactor.erl b/apps/couch/src/couch_view_compactor.erl index 69aaff00..8ea1dca2 100644 --- a/apps/couch/src/couch_view_compactor.erl +++ b/apps/couch/src/couch_view_compactor.erl @@ -20,14 +20,14 @@ %% @doc Compacts the views. GroupId must not include the _design/ prefix start_compact(DbName, GroupId) -> Pid = couch_view:get_group_server(DbName, <<"_design/",GroupId/binary>>), - gen_server:cast(Pid, {start_compact, fun compact_group/2}). + gen_server:call(Pid, {start_compact, fun compact_group/3}). %%============================================================================= %% internal functions %%============================================================================= %% @spec compact_group(Group, NewGroup) -> ok -compact_group(Group, EmptyGroup) -> +compact_group(Group, EmptyGroup, DbName) -> #group{ current_seq = Seq, id_btree = IdBtree, @@ -36,10 +36,8 @@ compact_group(Group, EmptyGroup) -> } = Group, #group{ - dbname = DbName, fd = Fd, id_btree = EmptyIdBtree, - sig = Sig, views = EmptyViews } = EmptyGroup, @@ -82,9 +80,26 @@ compact_group(Group, EmptyGroup) -> views=NewViews, current_seq=Seq }, - - Pid = ets:lookup_element(group_servers_by_sig, {DbName, Sig}, 2), - gen_server:cast(Pid, {compact_done, NewGroup}). + maybe_retry_compact(Db, GroupId, NewGroup). + +maybe_retry_compact(#db{name = DbName} = Db, GroupId, NewGroup) -> + #group{sig = Sig, fd = NewFd} = NewGroup, + Header = {Sig, couch_view_group:get_index_header_data(NewGroup)}, + ok = couch_file:write_header(NewFd, Header), + Pid = ets:lookup_element(group_servers_by_sig, {DbName, Sig}, 2), + case gen_server:call(Pid, {compact_done, NewGroup}) of + ok -> + couch_db:close(Db); + update -> + {ok, Db2} = couch_db:reopen(Db), + {_, Ref} = erlang:spawn_monitor(fun() -> + couch_view_updater:update(nil, NewGroup, Db2) + end), + receive + {'DOWN', Ref, _, _, {new_group, NewGroup2}} -> + maybe_retry_compact(Db2, GroupId, NewGroup2) + end + end. %% @spec compact_view(View, EmptyView) -> CompactView compact_view(View, EmptyView) -> diff --git a/apps/couch/src/couch_view_group.erl b/apps/couch/src/couch_view_group.erl index 11cb4c60..75644d6b 100644 --- a/apps/couch/src/couch_view_group.erl +++ b/apps/couch/src/couch_view_group.erl @@ -17,6 +17,9 @@ -export([start_link/1, request_group/2, trigger_group_update/2, request_group_info/1]). -export([open_db_group/2, open_temp_group/5, design_doc_to_view_group/1,design_root/2]). +%% Exports for the compactor +-export([get_index_header_data/1]). + %% gen_server callbacks -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). @@ -80,8 +83,7 @@ start_link(InitArgs) -> init({{_, DbName, _} = InitArgs, ReturnPid, Ref}) -> process_flag(trap_exit, true), case prepare_group(InitArgs, false) of - {ok, #group{fd=Fd, current_seq=Seq}=Group} -> - {ok, Db} = couch_db:open(DbName, []), + {ok, Db, #group{fd=Fd, current_seq=Seq}=Group} -> case Seq > couch_db:get_update_seq(Db) of true -> ReturnPid ! {Ref, self(), {error, invalid_view_seq}}, @@ -92,7 +94,7 @@ init({{_, DbName, _} = InitArgs, ReturnPid, Ref}) -> {ok, #group_state{ db_name=DbName, init_args=InitArgs, - group=Group#group{dbname=DbName}, + group=Group, ref_counter=erlang:monitor(process,Fd)}} end; Error -> @@ -118,16 +120,16 @@ init({{_, DbName, _} = InitArgs, ReturnPid, Ref}) -> handle_call({request_group, RequestSeq}, From, #group_state{ + db_name=DbName, group=#group{current_seq=Seq}=Group, updater_pid=nil, waiting_list=WaitList }=State) when RequestSeq > Seq -> Owner = self(), - Pid = spawn_link(fun()-> couch_view_updater:update(Owner, Group) end), + Pid = spawn_link(fun()-> couch_view_updater:update(Owner, Group, DbName) end), {noreply, State#group_state{ updater_pid=Pid, - group=Group, waiting_list=[{From,RequestSeq}|WaitList] }, infinity}; @@ -148,40 +150,28 @@ handle_call({request_group, RequestSeq}, From, waiting_list=[{From, RequestSeq}|WaitList] }, infinity}; -handle_call({start_compact, CompactFun}, _From, State) -> - {noreply, NewState} = handle_cast({start_compact, CompactFun}, State), - {reply, {ok, NewState#group_state.compactor_pid}, NewState}; - handle_call(request_group_info, _From, State) -> GroupInfo = get_group_info(State), - {reply, {ok, GroupInfo}, State}. + {reply, {ok, GroupInfo}, State}; -handle_cast({update_group, RequestSeq}, - #group_state{ - group=#group{current_seq=Seq}=Group, - updater_pid=nil}=State) when RequestSeq > Seq -> - Owner = self(), - Pid = spawn_link(fun()-> couch_view_updater:update(Owner, Group) end), - {noreply, State#group_state{updater_pid=Pid}}; -handle_cast({update_group, _RequestSeq}, State) -> - {noreply, State}; - -handle_cast({start_compact, CompactFun}, #group_state{compactor_pid=nil} +handle_call({start_compact, CompactFun}, _From, #group_state{compactor_pid=nil} = State) -> #group_state{ - group = #group{dbname = DbName, name = GroupId, sig = GroupSig} = Group, - init_args = {RootDir, _, _} + group = #group{name = GroupId, sig = GroupSig} = Group, + init_args = {RootDir, DbName, _} } = State, ?LOG_INFO("View index compaction starting for ~s ~s", [DbName, GroupId]), + {ok, Db} = couch_db:open_int(DbName, []), {ok, Fd} = open_index_file(compact, RootDir, DbName, GroupSig), - NewGroup = reset_file(Fd, DbName, Group), - Pid = spawn_link(fun() -> CompactFun(Group, NewGroup) end), - {noreply, State#group_state{compactor_pid = Pid}}; -handle_cast({start_compact, _}, State) -> + NewGroup = reset_file(Db, Fd, DbName, Group), + couch_db:close(Db), + Pid = spawn_link(fun() -> CompactFun(Group, NewGroup, DbName) end), + {reply, {ok, Pid}, State#group_state{compactor_pid = Pid}}; +handle_call({start_compact, _}, _From, #group_state{compactor_pid=Pid} = State) -> %% compact already running, this is a no-op - {noreply, State}; + {reply, {ok, Pid}, State}; -handle_cast({compact_done, #group{fd=NewFd, current_seq=NewSeq} = NewGroup}, +handle_call({compact_done, #group{fd=NewFd, current_seq=NewSeq} = NewGroup}, _From, #group_state{group = #group{current_seq=OldSeq}} = State) when NewSeq >= OldSeq -> #group_state{ @@ -204,7 +194,7 @@ handle_cast({compact_done, #group{fd=NewFd, current_seq=NewSeq} = NewGroup}, unlink(UpdaterPid), exit(UpdaterPid, view_compaction_complete), Owner = self(), - spawn_link(fun()-> couch_view_updater:update(Owner, NewGroup) end); + spawn_link(fun()-> couch_view_updater:update(Owner, NewGroup, DbName) end); true -> nil end, @@ -216,30 +206,30 @@ handle_cast({compact_done, #group{fd=NewFd, current_seq=NewSeq} = NewGroup}, erlang:demonitor(RefCounter), self() ! delayed_commit, - {noreply, State#group_state{ + {reply, ok, State#group_state{ group=NewGroup, ref_counter=erlang:monitor(process,NewFd), compactor_pid=nil, updater_pid=NewUpdaterPid }}; -handle_cast({compact_done, NewGroup}, State) -> +handle_call({compact_done, NewGroup}, _From, State) -> #group_state{ group = #group{name = GroupId, current_seq = CurrentSeq}, init_args={_RootDir, DbName, _} } = State, ?LOG_INFO("View index compaction still behind for ~s ~s -- current: ~p " ++ "compact: ~p", [DbName, GroupId, CurrentSeq, NewGroup#group.current_seq]), - GroupServer = self(), - Pid = spawn_link(fun() -> - erlang:monitor(process, NewGroup#group.fd), - {_,Ref} = erlang:spawn_monitor(fun() -> - couch_view_updater:update(nil, NewGroup) - end), - receive {'DOWN', Ref, _, _, {new_group, NewGroup2}} -> - gen_server:cast(GroupServer, {compact_done, NewGroup2}) - end - end), - {noreply, State#group_state{compactor_pid = Pid}}; + {reply, update, State}. + +handle_cast({update_group, RequestSeq}, + #group_state{ + group=#group{current_seq=Seq}=Group, + updater_pid=nil}=State) when RequestSeq > Seq -> + Owner = self(), + Pid = spawn_link(fun()-> couch_view_updater:update(Owner, Group) end), + {noreply, State#group_state{updater_pid=Pid}}; +handle_cast({update_group, _RequestSeq}, State) -> + {noreply, State}; handle_cast({partial_update, Pid, NewGroup}, #group_state{updater_pid=Pid} = State) -> @@ -279,7 +269,7 @@ handle_info(delayed_commit, #group_state{db_name=DbName,group=Group}=State) -> end; handle_info({'EXIT', FromPid, {new_group, Group}}, - #group_state{ + #group_state{db_name=DbName, updater_pid=UpPid, ref_counter=RefCounter, waiting_list=WaitList, @@ -293,22 +283,25 @@ handle_info({'EXIT', FromPid, {new_group, Group}}, {noreply, State#group_state{waiting_commit=true, waiting_list=[], group=Group, updater_pid=nil}}; StillWaiting -> - % we still have some waiters, reupdate the index + % we still have some waiters, reopen the database and reupdate the index Owner = self(), - Pid = spawn_link(fun() -> couch_view_updater:update(Owner, Group) end), + Pid = spawn_link(fun() -> couch_view_updater:update(Owner, Group, DbName) end), {noreply, State#group_state{waiting_commit=true, - waiting_list=StillWaiting, group=Group, updater_pid=Pid}} + waiting_list=StillWaiting, updater_pid=Pid}} end; handle_info({'EXIT', _, {new_group, _}}, State) -> %% message from an old (probably pre-compaction) updater; ignore {noreply, State}; -handle_info({'EXIT', FromPid, reset}, #group_state{init_args=InitArgs, - updater_pid=FromPid}=State) -> +handle_info({'EXIT', UpPid, reset}, + #group_state{init_args=InitArgs, updater_pid=UpPid} = State) -> case prepare_group(InitArgs, true) of - {ok, ResetGroup} -> + {ok, Db, ResetGroup} -> Owner = self(), - Pid = spawn_link(fun()-> couch_view_updater:update(Owner, ResetGroup) end), + couch_db:close(Db), + Pid = spawn_link(fun() -> + couch_view_updater:update(Owner, ResetGroup, Db#db.name) + end), {noreply, State#group_state{ updater_pid=Pid, group=ResetGroup}}; @@ -368,29 +361,32 @@ reply_all(#group_state{waiting_list=WaitList}=State, Reply) -> [catch gen_server:reply(Pid, Reply) || {Pid, _} <- WaitList], State#group_state{waiting_list=[]}. -prepare_group({Root, DbName, #group{dbname=X}=G}, Reset) when X =/= DbName -> - prepare_group({Root, DbName, G#group{dbname=DbName}}, Reset); prepare_group({RootDir, DbName, #group{sig=Sig}=Group}, ForceReset)-> - case open_index_file(RootDir, DbName, Sig) of - {ok, Fd} -> - if ForceReset -> - % this can happen if we missed a purge - {ok, reset_file(Fd, DbName, Group)}; - true -> - % 09 UPGRADE CODE - ok = couch_file:upgrade_old_header(Fd, <<$r, $c, $k, 0>>), - case (catch couch_file:read_header(Fd)) of - {ok, {Sig, HeaderInfo}} -> - % sigs match! - {ok, init_group(Fd, Group, HeaderInfo)}; - _ -> - % this happens on a new file - {ok, reset_file(Fd, DbName, Group)} - end + case couch_db:open_int(DbName, []) of + {ok, Db} -> + case open_index_file(RootDir, DbName, Sig) of + {ok, Fd} -> + if ForceReset -> + % this can happen if we missed a purge + {ok, Db, reset_file(Db, Fd, DbName, Group)}; + true -> + % 09 UPGRADE CODE + ok = couch_file:upgrade_old_header(Fd, <<$r, $c, $k, 0>>), + case (catch couch_file:read_header(Fd)) of + {ok, {Sig, HeaderInfo}} -> + % sigs match! + {ok, Db, init_group(Db, Fd, Group, HeaderInfo)}; + _ -> + % this happens on a new file + {ok, Db, reset_file(Db, Fd, DbName, Group)} + end + end; + Error -> + catch delete_index_file(RootDir, DbName, Sig), + Error end; - Error -> - catch delete_index_file(RootDir, DbName, Sig), - Error + Else -> + Else end. get_index_header_data(#group{current_seq=Seq, purge_seq=PurgeSeq, @@ -497,7 +493,7 @@ open_db_group(DbName, GroupId) -> end) end), receive {'DOWN', Ref, process, Pid, {ok, Doc}} -> - {ok, design_doc_to_view_group(Doc)}; + {ok, design_doc_to_view_group(Doc)}; {'DOWN', Ref, process, Pid, Error} -> Error end. @@ -575,31 +571,26 @@ design_doc_to_view_group(#doc{id=Id,body={Fields}}) -> end, 0, lists:sort(dict:to_list(DictBySrc))), set_view_sig(#group{name=Id, lib=Lib, views=Views, def_lang=Language, design_options=DesignOptions}). -reset_group(DbName, #group{views=Views}=Group) -> +reset_group(#group{views=Views}=Group) -> Views2 = [View#view{btree=nil} || View <- Views], - Group#group{dbname=DbName,fd=nil,query_server=nil,current_seq=0, + Group#group{fd=nil,query_server=nil,current_seq=0, id_btree=nil,views=Views2}. -reset_file(Fd, DbName, #group{sig=Sig,name=Name} = Group) -> +reset_file(Db, Fd, DbName, #group{sig=Sig,name=Name} = Group) -> ?LOG_DEBUG("Resetting group index \"~s\" in db ~s", [Name, DbName]), ok = couch_file:truncate(Fd, 0), ok = couch_file:write_header(Fd, {Sig, nil}), - init_group(Fd, reset_group(DbName, Group), nil). + init_group(Db, Fd, reset_group(Group), nil). delete_index_file(RootDir, DbName, GroupSig) -> couch_file:delete(RootDir, index_file_name(RootDir, DbName, GroupSig)). -init_group(Fd, #group{dbname=DbName, views=Views}=Group, nil) -> - case couch_db:open(DbName, []) of - {ok, Db} -> - PurgeSeq = try couch_db:get_purge_seq(Db) after couch_db:close(Db) end, - Header = #index_header{purge_seq=PurgeSeq, view_states=[{nil, 0, 0} || _ <- Views]}, - init_group(Fd, Group, Header); - {not_found, no_db_file} -> - ?LOG_ERROR("~p no_db_file ~p", [?MODULE, DbName]), - exit(no_db_file) - end; -init_group(Fd, #group{def_lang=Lang,views=Views}=Group, IndexHeader) -> +init_group(Db, Fd, #group{views=Views}=Group, nil) -> + init_group(Db, Fd, Group, + #index_header{seq=0, purge_seq=couch_db:get_purge_seq(Db), + id_btree_state=nil, view_states=[{nil, 0, 0} || _ <- Views]}); +init_group(_Db, Fd, #group{def_lang=Lang,views=Views}= + Group, IndexHeader) -> #index_header{seq=Seq, purge_seq=PurgeSeq, id_btree_state=IdBtreeState, view_states=ViewStates} = IndexHeader, StateUpdate = fun diff --git a/apps/couch/src/couch_view_updater.erl b/apps/couch/src/couch_view_updater.erl index 90cb20d4..8238e3e5 100644 --- a/apps/couch/src/couch_view_updater.erl +++ b/apps/couch/src/couch_view_updater.erl @@ -12,29 +12,35 @@ -module(couch_view_updater). --export([update/2, do_maps/4, do_writes/5, load_docs/3]). +-export([update/3, do_maps/4, do_writes/5, load_docs/3]). -include("couch_db.hrl"). --spec update(_, #group{}) -> no_return(). +-spec update(_, #group{}, Dbname::binary()) -> no_return(). -update(Owner, Group) -> +update(Owner, Group, DbName) when is_binary(DbName) -> + {ok, Db} = couch_db:open_int(DbName, []), + try + update(Owner, Group, Db) + after + couch_db:close(Db) + end; + +update(Owner, Group, #db{name = DbName} = Db) -> #group{ - dbname = DbName, name = GroupName, current_seq = Seq, purge_seq = PurgeSeq } = Group, couch_task_status:add_task(<<"View Group Indexer">>, <<DbName/binary," ",GroupName/binary>>, <<"Starting index update">>), - {ok, Db} = couch_db:open(DbName, []), DbPurgeSeq = couch_db:get_purge_seq(Db), Group2 = if DbPurgeSeq == PurgeSeq -> Group; DbPurgeSeq == PurgeSeq + 1 -> couch_task_status:update(<<"Removing purged entries from view index.">>), - purge_index(Db, Group); + purge_index(Group, Db); true -> couch_task_status:update(<<"Resetting view index due to lost purge entries.">>), exit(reset) @@ -77,7 +83,7 @@ load_docs(DocInfo, _, {I, Db, MapQueue, DocOpts, IncludeDesign, Total} = Acc) -> load_doc(Db, DocInfo, MapQueue, DocOpts, IncludeDesign), {ok, setelement(1, Acc, I+1)}. -purge_index(Db, #group{views=Views, id_btree=IdBtree}=Group) -> +purge_index(#group{views=Views, id_btree=IdBtree}=Group, Db) -> {ok, PurgedIdsRevs} = couch_db:get_last_purged(Db), Ids = [Id || {Id, _Revs} <- PurgedIdsRevs], {ok, Lookups, IdBtree2} = couch_btree:query_modify(IdBtree, Ids, [], Ids), @@ -133,7 +139,7 @@ load_doc(Db, DI, MapQueue, DocOpts, IncludeDesign) -> couch_work_queue:queue(MapQueue, {Seq, Doc}) end end. - + -spec do_maps(#group{}, pid(), pid(), any()) -> any(). do_maps(Group, MapQueue, WriteQueue, ViewEmptyKVs) -> case couch_work_queue:dequeue(MapQueue) of @@ -162,10 +168,10 @@ do_writes(Parent, Owner, Group, WriteQueue, InitialBuild) -> if Go =:= stop -> Parent ! {new_group, Group2}; true -> - case Owner of - nil -> ok; - _ -> ok = gen_server:cast(Owner, {partial_update, Parent, Group2}) - end, + case Owner of + nil -> ok; + _ -> ok = gen_server:cast(Owner, {partial_update, Parent, Group2}) + end, ?MODULE:do_writes(Parent, Owner, Group2, WriteQueue, InitialBuild) end end. diff --git a/apps/couch/src/test_util.erl b/apps/couch/src/test_util.erl index 55b95139..f086bf94 100644 --- a/apps/couch/src/test_util.erl +++ b/apps/couch/src/test_util.erl @@ -14,6 +14,7 @@ -export([init_code_path/0]). -export([source_file/1, build_file/1, config_files/0]). +-export([request/3, request/4]). init_code_path() -> code:load_abs("apps/couch/test/etap/etap"). @@ -31,3 +32,30 @@ config_files() -> source_file("test/etap/random_port.ini") ]. +request(Url, Headers, Method) -> + request(Url, Headers, Method, []). + +request(Url, Headers, Method, Body) -> + request(Url, Headers, Method, Body, 3). + +request(_Url, _Headers, _Method, _Body, 0) -> + {error, request_failed}; +request(Url, Headers, Method, Body, N) -> + case code:is_loaded(ibrowse) of + false -> + {ok, _} = ibrowse:start(); + _ -> + ok + end, + case ibrowse:send_req(Url, Headers, Method, Body) of + {ok, Code0, RespHeaders, RespBody0} -> + Code = list_to_integer(Code0), + RespBody = iolist_to_binary(RespBody0), + {ok, Code, RespHeaders, RespBody}; + {error, {'EXIT', {normal, _}}} -> + % Connection closed right after a successful request that + % used the same connection. + request(Url, Headers, Method, Body, N - 1); + Error -> + Error + end. |