diff options
author | Adam Kocoloski <adam@cloudant.com> | 2011-08-15 15:45:15 -0400 |
---|---|---|
committer | Adam Kocoloski <adam@cloudant.com> | 2011-08-15 15:45:15 -0400 |
commit | 6ffe1675dd7b004e48891956a6bdbe32899ce80c (patch) | |
tree | 57326d9a498481e65bb0db38c66daf10896801f1 /apps/couch/src | |
parent | 52ff89ff7996e839b9e2f91fd76184d362a8aeb0 (diff) | |
parent | fdd1a5d0bc48b49b0df5c9217beff9574011283c (diff) |
Merge branch '11554-merge-couchdb-1.1'
Diffstat (limited to 'apps/couch/src')
43 files changed, 3852 insertions, 1134 deletions
diff --git a/apps/couch/src/couch_btree.erl b/apps/couch/src/couch_btree.erl index 2fcc8ae7..3f2e86d8 100644 --- a/apps/couch/src/couch_btree.erl +++ b/apps/couch/src/couch_btree.erl @@ -198,7 +198,7 @@ query_modify(Bt, LookupKeys, InsertValues, RemoveKeys) -> {ok, NewRoot, Bt3} = complete_root(Bt2, KeyPointers), {ok, QueryResults, Bt3#btree{root=NewRoot}}. -% for ordering different operatations with the same key. +% for ordering different operations with the same key. % fetch < remove < insert op_order(fetch) -> 1; op_order(remove) -> 2; diff --git a/apps/couch/src/couch_changes.erl b/apps/couch/src/couch_changes.erl index 196f2fd5..44d0ad46 100644 --- a/apps/couch/src/couch_changes.erl +++ b/apps/couch/src/couch_changes.erl @@ -17,17 +17,19 @@ configure_filter/4, filter/2]). %% @spec handle_changes(#changes_args{}, #httpd{} | {json_req, {[any()]}}, #db{}) -> any() -handle_changes(#changes_args{filter=Raw, style=Style}=Args1, Req, Db) -> - Args = Args1#changes_args{filter=make_filter_fun(Raw, Style, Req, Db)}, +handle_changes(#changes_args{style=Style}=Args1, Req, Db) -> + #changes_args{feed = Feed} = Args = Args1#changes_args{ + filter = make_filter_fun(Args1#changes_args.filter, Style, Req, Db) + }, StartSeq = case Args#changes_args.dir of rev -> couch_db:get_update_seq(Db); fwd -> Args#changes_args.since end, - if Args#changes_args.feed == "continuous" orelse - Args#changes_args.feed == "longpoll" -> - fun(Callback) -> + if Feed == "continuous" orelse Feed == "longpoll" -> + fun(CallbackAcc) -> + {Callback, UserAcc} = get_callback_acc(CallbackAcc), Self = self(), {ok, Notify} = couch_db_update_notifier:start_link( fun({_, DbName}) when DbName == Db#db.name -> @@ -36,12 +38,13 @@ handle_changes(#changes_args{filter=Raw, style=Style}=Args1, Req, Db) -> ok end ), - start_sending_changes(Callback, Args#changes_args.feed), + UserAcc2 = start_sending_changes(Callback, UserAcc, Feed), {Timeout, TimeoutFun} = get_changes_timeout(Args, Callback), try keep_sending_changes( Args, Callback, + UserAcc2, Db, StartSeq, <<"">>, @@ -50,37 +53,52 @@ handle_changes(#changes_args{filter=Raw, style=Style}=Args1, Req, Db) -> ) after couch_db_update_notifier:stop(Notify), - get_rest_db_updated() % clean out any remaining update messages + get_rest_db_updated(ok) % clean out any remaining update messages end end; true -> - fun(Callback) -> - start_sending_changes(Callback, Args#changes_args.feed), - {ok, {_, LastSeq, _Prepend, _, _, _, _, _}} = + fun(CallbackAcc) -> + {Callback, UserAcc} = get_callback_acc(CallbackAcc), + UserAcc2 = start_sending_changes(Callback, UserAcc, Feed), + {ok, {_, LastSeq, _Prepend, _, _, UserAcc3, _, _, _, _}} = send_changes( Args#changes_args{feed="normal"}, Callback, + UserAcc2, Db, StartSeq, - <<"">> + <<>> ), - end_sending_changes(Callback, LastSeq, Args#changes_args.feed) + end_sending_changes(Callback, UserAcc3, LastSeq, Feed) end end. +get_callback_acc({Callback, _UserAcc} = Pair) when is_function(Callback, 3) -> + Pair; +get_callback_acc(Callback) when is_function(Callback, 2) -> + {fun(Ev, Data, _) -> Callback(Ev, Data) end, ok}. + %% @spec make_filter_fun(string(), main_only|all_docs, #httpd{} | {json_req, %% {[any()]}}, #db{}) -> fun() -make_filter_fun(Filter, Style, Req, Db) when is_list(Filter) -> - case [?l2b(couch_httpd:unquote(X)) || X <- string:tokens(Filter, "/")] of +make_filter_fun([$_ | _] = FilterName, Style, Req, Db) -> + builtin_filter_fun(FilterName, Style, Req, Db); +make_filter_fun(FilterName, Style, Req, Db) -> + os_filter_fun(FilterName, Style, Req, Db). + +os_filter_fun(FilterName, Style, Req, Db) -> + case [list_to_binary(couch_httpd:unquote(Part)) + || Part <- string:tokens(FilterName, "/")] of [] -> - make_filter_fun(nil, Style, Req, Db); + fun(_Db2, #doc_info{revs=Revs}) -> + builtin_results(Style, Revs) + end; [DName, FName] -> DesignId = <<"_design/", DName/binary>>, DDoc = couch_httpd_db:couch_doc_open(Db, DesignId, nil, []), % validate that the ddoc has the filter fun #doc{body={Props}} = DDoc, couch_util:get_nested_json_value({Props}, [<<"filters">>, FName]), - fun(DocInfo) -> + fun(Db2, DocInfo) -> DocInfos = case Style of main_only -> @@ -89,10 +107,10 @@ make_filter_fun(Filter, Style, Req, Db) when is_list(Filter) -> [DocInfo#doc_info{revs=[Rev]}|| Rev <- DocInfo#doc_info.revs] end, Docs = [Doc || {ok, Doc} <- [ - couch_db:open_doc(Db, DocInfo2, [deleted, conflicts]) + couch_db:open_doc(Db2, DocInfo2, [deleted, conflicts]) || DocInfo2 <- DocInfos]], {ok, Passes} = couch_query_servers:filter_docs( - Req, Db, DDoc, FName, Docs + Req, Db2, DDoc, FName, Docs ), [{[{<<"rev">>, couch_doc:rev_to_str({RevPos,RevId})}]} || {Pass, #doc{revs={RevPos,[RevId|_]}}} @@ -101,9 +119,50 @@ make_filter_fun(Filter, Style, Req, Db) when is_list(Filter) -> _Else -> throw({bad_request, "filter parameter must be of the form `designname/filtername`"}) + end. + +builtin_filter_fun("_doc_ids", Style, {json_req, {Props}}, _Db) -> + filter_docids(couch_util:get_value(<<"doc_ids">>, Props), Style); +builtin_filter_fun("_doc_ids", Style, #httpd{method='POST'}=Req, _Db) -> + {Props} = couch_httpd:json_body_obj(Req), + DocIds = couch_util:get_value(<<"doc_ids">>, Props, nil), + filter_docids(DocIds, Style); +builtin_filter_fun("_doc_ids", Style, #httpd{method='GET'}=Req, _Db) -> + DocIds = ?JSON_DECODE(couch_httpd:qs_value(Req, "doc_ids", "null")), + filter_docids(DocIds, Style); +builtin_filter_fun("_design", Style, _Req, _Db) -> + filter_designdoc(Style); +builtin_filter_fun(_FilterName, _Style, _Req, _Db) -> + throw({bad_request, "unknown builtin filter name"}). + +filter_docids(DocIds, Style) when is_list(DocIds)-> + fun(_Db, #doc_info{id=DocId, revs=Revs}) -> + case lists:member(DocId, DocIds) of + true -> + builtin_results(Style, Revs); + _ -> [] + end end; -make_filter_fun(_, Style, _, _) -> - fun(DI) -> ?MODULE:filter(DI, Style) end. +filter_docids(_, _) -> + throw({bad_request, "`doc_ids` filter parameter is not a list."}). + +filter_designdoc(Style) -> + fun(_Db, #doc_info{id=DocId, revs=Revs}) -> + case DocId of + <<"_design", _/binary>> -> + builtin_results(Style, Revs); + _ -> [] + end + end. + +builtin_results(Style, [#rev_info{rev=Rev}|_]=Revs) -> + case Style of + main_only -> + [{[{<<"rev">>, couch_doc:rev_to_str(Rev)}]}]; + all_docs -> + [{[{<<"rev">>, couch_doc:rev_to_str(R)}]} + || #rev_info{rev=R} <- Revs] + end. configure_filter(Filter, Style, Req, Db) when is_list(Filter) -> case [?l2b(couch_httpd:unquote(X)) || X <- string:tokens(Filter, "/")] of @@ -118,6 +177,17 @@ configure_filter(Filter, Style, Req, Db) when is_list(Filter) -> #doc{body={Props}} = DDoc, couch_util:get_nested_json_value({Props}, [<<"filters">>, FName]), {custom, Style, {Db, JsonReq, DDoc, FName}}; + [<<"_doc_ids">>] -> + DocIds = ?JSON_DECODE(couch_httpd:qs_value(Req, "doc_ids", "null")), + case is_list(DocIds) of + true -> ok; + false -> throw({bad_request, "`doc_ids` filter parameter is not a list."}) + end, + {builtin, Style, {doc_ids, DocIds}}; + [<<"_design">>] -> + {builtin, Style, design}; + [<<"_", _/binary>>] -> + throw({bad_request, "unknown builtin filter name"}); _Else -> throw({bad_request, "filter parameter must be of the form `designname/filtername`"}) @@ -132,7 +202,11 @@ filter(#doc_info{revs=Revs}, all_docs) -> filter(#doc_info{id=Id, revs=RevInfos}, {custom, main_only, Acc}) -> custom_filter(Id, [(hd(RevInfos))#rev_info.rev], Acc); filter(#doc_info{id=Id, revs=RevInfos}, {custom, all_docs, Acc}) -> - custom_filter(Id, [R || #rev_info{rev=R} <- RevInfos], Acc). + custom_filter(Id, [R || #rev_info{rev=R} <- RevInfos], Acc); +filter(#doc_info{id=Id, revs=RevInfos}, {builtin, main_only, Acc}) -> + builtin_filter(Id, [(hd(RevInfos))#rev_info.rev], Acc); +filter(#doc_info{id=Id, revs=RevInfos}, {builtin, all_docs, Acc}) -> + builtin_filter(Id, [R || #rev_info{rev=R} <- RevInfos], Acc). custom_filter(Id, Revs, {Db, JsonReq, DDoc, Filter}) -> {ok, Results} = fabric:open_revs(Db, Id, Revs, [deleted, conflicts]), @@ -144,6 +218,21 @@ custom_filter(Id, Revs, {Db, JsonReq, DDoc, Filter}) -> || {Pass, #doc{revs={RevPos,[RevId|_]}}} <- lists:zip(Passes, Docs), Pass == true]. +builtin_filter(Id, Revs, design) -> + case Id of + <<"_design", _/binary>> -> + [{[{<<"rev">>, couch_doc:rev_to_str(Rev)}]} || Rev <- Revs]; + _ -> + [] + end; +builtin_filter(Id, Revs, {doc_ids, DocIds}) -> + case lists:member(Id, DocIds) of + true -> + [{[{<<"rev">>, couch_doc:rev_to_str(Rev)}]} || Rev <- Revs]; + false -> + [] + end. + get_changes_timeout(Args, Callback) -> #changes_args{ heartbeat = Heartbeat, @@ -157,28 +246,31 @@ get_changes_timeout(Args, Callback) -> undefined -> case Timeout of undefined -> - {DefaultTimeout, fun() -> stop end}; + {DefaultTimeout, fun(UserAcc) -> {stop, UserAcc} end}; infinity -> - {infinity, fun() -> stop end}; + {infinity, fun(UserAcc) -> {stop, UserAcc} end}; _ -> - {lists:min([DefaultTimeout, Timeout]), fun() -> stop end} + {lists:min([DefaultTimeout, Timeout]), + fun(UserAcc) -> {stop, UserAcc} end} end; true -> - {DefaultTimeout, fun() -> Callback(timeout, ResponseType), ok end}; + {DefaultTimeout, + fun(UserAcc) -> {ok, Callback(timeout, ResponseType, UserAcc)} end}; _ -> {lists:min([DefaultTimeout, Heartbeat]), - fun() -> Callback(timeout, ResponseType), ok end} + fun(UserAcc) -> {ok, Callback(timeout, ResponseType, UserAcc)} end} end. -start_sending_changes(_Callback, "continuous") -> - ok; -start_sending_changes(Callback, ResponseType) -> - Callback(start, ResponseType). +start_sending_changes(_Callback, UserAcc, "continuous") -> + UserAcc; +start_sending_changes(Callback, UserAcc, ResponseType) -> + Callback(start, ResponseType, UserAcc). -send_changes(Args, Callback, Db, StartSeq, Prepend) -> +send_changes(Args, Callback, UserAcc, Db, StartSeq, Prepend) -> #changes_args{ style = Style, include_docs = IncludeDocs, + conflicts = Conflicts, limit = Limit, feed = ResponseType, dir = Dir, @@ -190,33 +282,36 @@ send_changes(Args, Callback, Db, StartSeq, Prepend) -> StartSeq, fun changes_enumerator/2, [{dir, Dir}], - {Db, StartSeq, Prepend, FilterFun, Callback, ResponseType, Limit, - IncludeDocs} + {Db, StartSeq, Prepend, FilterFun, Callback, UserAcc, ResponseType, + Limit, IncludeDocs, Conflicts} ). -keep_sending_changes(Args, Callback, Db, StartSeq, Prepend, Timeout, +keep_sending_changes(Args, Callback, UserAcc, Db, StartSeq, Prepend, Timeout, TimeoutFun) -> #changes_args{ feed = ResponseType, - limit = Limit + limit = Limit, + db_open_options = DbOptions } = Args, % ?LOG_INFO("send_changes start ~p",[StartSeq]), - {ok, {_, EndSeq, Prepend2, _, _, _, NewLimit, _}} = send_changes( - Args#changes_args{dir=fwd}, Callback, Db, StartSeq, Prepend + {ok, {_, EndSeq, Prepend2, _, _, UserAcc2, _, NewLimit, _, _}} = send_changes( + Args#changes_args{dir=fwd}, Callback, UserAcc, Db, StartSeq, Prepend ), % ?LOG_INFO("send_changes last ~p",[EndSeq]), couch_db:close(Db), if Limit > NewLimit, ResponseType == "longpoll" -> - end_sending_changes(Callback, EndSeq, ResponseType); + end_sending_changes(Callback, UserAcc2, EndSeq, ResponseType); true -> - case wait_db_updated(Timeout, TimeoutFun) of - updated -> + case wait_db_updated(Timeout, TimeoutFun, UserAcc2) of + {updated, UserAcc3} -> % ?LOG_INFO("wait_db_updated updated ~p",[{Db#db.name, EndSeq}]), - case couch_db:open(Db#db.name, [{user_ctx, Db#db.user_ctx}]) of + DbOptions1 = [{user_ctx, Db#db.user_ctx} | DbOptions], + case couch_db:open(Db#db.name, DbOptions1) of {ok, Db2} -> keep_sending_changes( Args#changes_args{limit=NewLimit}, Callback, + UserAcc3, Db2, EndSeq, Prepend2, @@ -224,79 +319,96 @@ keep_sending_changes(Args, Callback, Db, StartSeq, Prepend, Timeout, TimeoutFun ); _Else -> - end_sending_changes(Callback, EndSeq, ResponseType) + end_sending_changes(Callback, UserAcc2, EndSeq, ResponseType) end; - stop -> + {stop, UserAcc3} -> % ?LOG_INFO("wait_db_updated stop ~p",[{Db#db.name, EndSeq}]), - end_sending_changes(Callback, EndSeq, ResponseType) + end_sending_changes(Callback, UserAcc3, EndSeq, ResponseType) end end. -end_sending_changes(Callback, EndSeq, ResponseType) -> - Callback({stop, EndSeq}, ResponseType). +end_sending_changes(Callback, UserAcc, EndSeq, ResponseType) -> + Callback({stop, EndSeq}, ResponseType, UserAcc). -changes_enumerator(DocInfo, {Db, _, _, FilterFun, Callback, "continuous", - Limit, IncludeDocs}) -> +changes_enumerator(DocInfo, {Db, _, _, FilterFun, Callback, UserAcc, + "continuous", Limit, IncludeDocs, Conflicts}) -> - #doc_info{id=Id, high_seq=Seq, revs=[#rev_info{deleted=Del,rev=Rev}|_]} - = DocInfo, - Results0 = FilterFun(DocInfo), + #doc_info{high_seq = Seq} = DocInfo, + Results0 = FilterFun(Db, DocInfo), Results = [Result || Result <- Results0, Result /= null], Go = if Limit =< 1 -> stop; true -> ok end, case Results of [] -> - {Go, {Db, Seq, nil, FilterFun, Callback, "continuous", Limit, - IncludeDocs} + {Go, {Db, Seq, nil, FilterFun, Callback, UserAcc, "continuous", Limit, + IncludeDocs, Conflicts} }; _ -> - ChangesRow = changes_row(Db, Seq, Id, Del, Results, Rev, IncludeDocs), - Callback({change, ChangesRow, <<"">>}, "continuous"), - {Go, {Db, Seq, nil, FilterFun, Callback, "continuous", Limit - 1, - IncludeDocs} + ChangesRow = changes_row(Db, Results, DocInfo, IncludeDocs, Conflicts), + UserAcc2 = Callback({change, ChangesRow, <<>>}, "continuous", UserAcc), + {Go, {Db, Seq, nil, FilterFun, Callback, UserAcc2, "continuous", + Limit - 1, IncludeDocs, Conflicts} } end; -changes_enumerator(DocInfo, {Db, _, Prepend, FilterFun, Callback, ResponseType, - Limit, IncludeDocs}) -> +changes_enumerator(DocInfo, {Db, _, Prepend, FilterFun, Callback, UserAcc, + ResponseType, Limit, IncludeDocs, Conflicts}) -> - #doc_info{id=Id, high_seq=Seq, revs=[#rev_info{deleted=Del,rev=Rev}|_]} - = DocInfo, - Results0 = FilterFun(DocInfo), + #doc_info{high_seq = Seq} = DocInfo, + Results0 = FilterFun(Db, DocInfo), Results = [Result || Result <- Results0, Result /= null], - Go = if Limit =< 1 -> stop; true -> ok end, + Go = if (Limit =< 1) andalso Results =/= [] -> stop; true -> ok end, case Results of [] -> - {Go, {Db, Seq, Prepend, FilterFun, Callback, ResponseType, Limit, - IncludeDocs} + {Go, {Db, Seq, Prepend, FilterFun, Callback, UserAcc, ResponseType, + Limit, IncludeDocs, Conflicts} }; _ -> - ChangesRow = changes_row(Db, Seq, Id, Del, Results, Rev, IncludeDocs), - Callback({change, ChangesRow, Prepend}, ResponseType), - {Go, {Db, Seq, <<",\n">>, FilterFun, Callback, ResponseType, Limit - 1, - IncludeDocs} + ChangesRow = changes_row(Db, Results, DocInfo, IncludeDocs, Conflicts), + UserAcc2 = Callback({change, ChangesRow, Prepend}, ResponseType, UserAcc), + {Go, {Db, Seq, <<",\n">>, FilterFun, Callback, UserAcc2, ResponseType, + Limit - 1, IncludeDocs, Conflicts} } end. -changes_row(Db, Seq, Id, Del, Results, Rev, true) -> - {[{<<"seq">>, Seq}, {<<"id">>, Id}, {<<"changes">>, Results}] ++ - deleted_item(Del) ++ couch_httpd_view:doc_member(Db, {Id, Rev})}; -changes_row(_, Seq, Id, Del, Results, _, false) -> +changes_row(Db, Results, DocInfo, IncludeDoc, Conflicts) -> + #doc_info{ + id = Id, high_seq = Seq, revs = [#rev_info{deleted = Del} | _] + } = DocInfo, {[{<<"seq">>, Seq}, {<<"id">>, Id}, {<<"changes">>, Results}] ++ - deleted_item(Del)}. + deleted_item(Del) ++ case IncludeDoc of + true -> + Options = if Conflicts -> [conflicts]; true -> [] end, + couch_httpd_view:doc_member(Db, DocInfo, Options); + false -> + [] + end}. -deleted_item(true) -> [{deleted, true}]; +deleted_item(true) -> [{<<"deleted">>, true}]; deleted_item(_) -> []. % waits for a db_updated msg, if there are multiple msgs, collects them. -wait_db_updated(Timeout, TimeoutFun) -> - receive db_updated -> get_rest_db_updated() +wait_db_updated(Timeout, TimeoutFun, UserAcc) -> + receive + db_updated -> + get_rest_db_updated(UserAcc) after Timeout -> - case TimeoutFun() of - ok -> wait_db_updated(Timeout, TimeoutFun); - stop -> stop + {Go, UserAcc2} = TimeoutFun(UserAcc), + case Go of + ok -> + wait_db_updated(Timeout, TimeoutFun, UserAcc2); + stop -> + {stop, UserAcc2} end end. +get_rest_db_updated(UserAcc) -> + receive + db_updated -> + get_rest_db_updated(UserAcc) + after 0 -> + {updated, UserAcc} + end. + get_rest_db_updated() -> receive db_updated -> get_rest_db_updated() after 0 -> updated diff --git a/apps/couch/src/couch_config.erl b/apps/couch/src/couch_config.erl index 73abdfd5..933bb5d5 100644 --- a/apps/couch/src/couch_config.erl +++ b/apps/couch/src/couch_config.erl @@ -93,15 +93,19 @@ register(Fun, Pid) -> init(IniFiles) -> ets:new(?MODULE, [named_table, set, protected]), - lists:map(fun(IniFile) -> - {ok, ParsedIniValues} = parse_ini_file(IniFile), - ets:insert(?MODULE, ParsedIniValues) - end, IniFiles), - WriteFile = case IniFiles of - [_|_] -> lists:last(IniFiles); - _ -> undefined - end, - {ok, #config{write_filename=WriteFile}}. + try + lists:map(fun(IniFile) -> + {ok, ParsedIniValues} = parse_ini_file(IniFile), + ets:insert(?MODULE, ParsedIniValues) + end, IniFiles), + WriteFile = case IniFiles of + [_|_] -> lists:last(IniFiles); + _ -> undefined + end, + {ok, #config{write_filename = WriteFile}} + catch _Tag:Error -> + {stop, Error} + end. terminate(_Reason, _State) -> @@ -112,8 +116,7 @@ handle_call(all, _From, Config) -> Resp = lists:sort((ets:tab2list(?MODULE))), {reply, Resp, Config}; handle_call({set, Sec, Key, Val, Persist}, _From, Config) -> - true = ets:insert(?MODULE, {{Sec, Key}, Val}), - case {Persist, Config#config.write_filename} of + Result = case {Persist, Config#config.write_filename} of {true, undefined} -> ok; {true, FileName} -> @@ -121,9 +124,15 @@ handle_call({set, Sec, Key, Val, Persist}, _From, Config) -> _ -> ok end, - Event = {config_change, Sec, Key, Val, Persist}, - gen_event:sync_notify(couch_config_event, Event), - {reply, ok, Config}; + case Result of + ok -> + true = ets:insert(?MODULE, {{Sec, Key}, Val}), + Event = {config_change, Sec, Key, Val, Persist}, + gen_event:sync_notify(couch_config_event, Event), + {reply, ok, Config}; + _Error -> + {reply, Result, Config} + end; handle_call({delete, Sec, Key, Persist}, _From, Config) -> true = ets:delete(?MODULE, {Sec,Key}), case {Persist, Config#config.write_filename} of @@ -158,6 +167,8 @@ parse_ini_file(IniFile) -> case file:read_file(IniFilename) of {ok, IniBin0} -> IniBin0; + {error, eacces} -> + throw({file_permission_error, IniFile}); {error, enoent} -> Fmt = "Couldn't find server configuration file ~s.", Msg = ?l2b(io_lib:format(Fmt, [IniFilename])), diff --git a/apps/couch/src/couch_config_writer.erl b/apps/couch/src/couch_config_writer.erl index c8691d79..decd269a 100644 --- a/apps/couch/src/couch_config_writer.erl +++ b/apps/couch/src/couch_config_writer.erl @@ -35,7 +35,14 @@ save_to_file({{Section, Key}, Value}, File) -> NewLines = process_file_lines(Lines, [], SectionLine, Pattern, Key, Value), NewFileContents = reverse_and_add_newline(strip_empty_lines(NewLines), []), - ok = file:write_file(File, NewFileContents). + case file:write_file(File, NewFileContents) of + ok -> + ok; + {error, eacces} -> + {file_permission_error, File}; + Error -> + Error + end. process_file_lines([Section|Rest], SeenLines, Section, Pattern, Key, Value) -> diff --git a/apps/couch/src/couch_db.erl b/apps/couch/src/couch_db.erl index dec6870f..96c49886 100644 --- a/apps/couch/src/couch_db.erl +++ b/apps/couch/src/couch_db.erl @@ -24,7 +24,7 @@ -export([start_link/3,open_doc_int/3,ensure_full_commit/1,ensure_full_commit/2]). -export([set_security/2,get_security/1]). -export([changes_since/5,changes_since/6,read_doc/2,new_revid/1]). --export([check_is_admin/1, check_is_reader/1, get_doc_count/1, load_validation_funs/1]). +-export([check_is_admin/1, check_is_reader/1, get_doc_count/1]). -export([reopen/1, make_doc/5]). -include("couch_db.hrl"). @@ -779,6 +779,8 @@ update_docs(Db, Docs, Options, interactive_edit) -> % for the doc. make_first_doc_on_disk(_Db, _Id, _Pos, []) -> nil; +make_first_doc_on_disk(Db, Id, Pos, [{_Rev, #doc{}} | RestPath]) -> + make_first_doc_on_disk(Db, Id, Pos-1, RestPath); make_first_doc_on_disk(Db, Id, Pos, [{_Rev, ?REV_MISSING}|RestPath]) -> make_first_doc_on_disk(Db, Id, Pos - 1, RestPath); make_first_doc_on_disk(Db, Id, Pos, [{_, #leaf{deleted=IsDel, ptr=Sp}} |_]=DocPath) -> @@ -856,7 +858,7 @@ doc_flush_atts(Doc, Fd) -> Doc#doc{atts=[flush_att(Fd, Att) || Att <- Doc#doc.atts]}. check_md5(_NewSig, <<>>) -> ok; -check_md5(Sig1, Sig2) when Sig1 == Sig2 -> ok; +check_md5(Sig, Sig) -> ok; check_md5(_, _) -> throw(md5_mismatch). flush_att(Fd, #att{data={Fd0, _}}=Att) when Fd0 == Fd -> @@ -967,10 +969,15 @@ with_stream(Fd, #att{md5=InMd5,type=Type,encoding=Enc}=Att, Fun) -> write_streamed_attachment(_Stream, _F, 0) -> ok; write_streamed_attachment(Stream, F, LenLeft) when LenLeft > 0 -> - Bin = F(), + Bin = read_next_chunk(F, LenLeft), ok = couch_stream:write(Stream, Bin), write_streamed_attachment(Stream, F, LenLeft - size(Bin)). +read_next_chunk(F, _) when is_function(F, 0) -> + F(); +read_next_chunk(F, LenLeft) when is_function(F, 1) -> + F(lists:min([LenLeft, 16#2000])). + enum_docs_since_reduce_to_count(Reds) -> couch_btree:final_reduce( fun couch_db_updater:btree_by_seq_reduce/2, Reds). diff --git a/apps/couch/src/couch_db_updater.erl b/apps/couch/src/couch_db_updater.erl index 138930f1..9bf52ee0 100644 --- a/apps/couch/src/couch_db_updater.erl +++ b/apps/couch/src/couch_db_updater.erl @@ -52,7 +52,7 @@ init({DbName, Filepath, Fd, Options}) -> terminate(_Reason, Db) -> - couch_file:close(Db#db.fd), + ok = couch_file:close(Db#db.fd), couch_util:shutdown_sync(Db#db.compactor_pid), couch_util:shutdown_sync(Db#db.fd), ok. @@ -133,8 +133,9 @@ handle_call({purge_docs, IdRevs}, _From, Db) -> {DocInfoToUpdate, NewSeq} = lists:mapfoldl( fun(#full_doc_info{rev_tree=Tree}=FullInfo, SeqAcc) -> - Tree2 = couch_key_tree:map_leafs( fun(RevInfo) -> - RevInfo#rev_info{seq=SeqAcc + 1} + Tree2 = couch_key_tree:map_leafs( + fun(_RevId, {IsDeleted, BodyPointer, _UpdateSeq}) -> + {IsDeleted, BodyPointer, SeqAcc + 1} end, Tree), {couch_doc:to_doc_info(FullInfo#full_doc_info{rev_tree=Tree2}), SeqAcc + 1} @@ -512,9 +513,9 @@ flush_trees(#db{fd=Fd,header=Header}=Db, {ok, NewSummaryPointer} = case Header#db_header.disk_version < 4 of true -> - couch_file:append_term(Fd, {Doc#doc.body, DiskAtts}); + {ok, _} = couch_file:append_term(Fd, {Doc#doc.body, DiskAtts}); false -> - couch_file:append_term_md5(Fd, {Doc#doc.body, DiskAtts}) + {ok, _} = couch_file:append_term_md5(Fd, {Doc#doc.body, DiskAtts}) end, #leaf{ deleted = IsDeleted, @@ -666,10 +667,11 @@ update_docs_int(Db, DocsList, NonRepDocs, MergeConflicts, FullCommit) -> % Check if we just updated any design documents, and update the validation % funs if we did. - case [1 || <<"_design/",_/binary>> <- Ids] of - [] -> + case lists:any( + fun(<<"_design/", _/binary>>) -> true; (_) -> false end, Ids) of + false -> Db4 = Db3; - _ -> + true -> Db4 = refresh_validate_doc_funs(Db3) end, @@ -687,7 +689,8 @@ compute_data_sizes([FullDocInfo | RestDocInfos], Acc) -> - +update_local_docs(Db, []) -> + {ok, Db}; update_local_docs(#db{local_tree=Btree}=Db, Docs) -> Ids = [Id || {_Client, #doc{id=Id}} <- Docs], OldDocLookups = couch_btree:lookup(Btree, Ids), diff --git a/apps/couch/src/couch_doc.erl b/apps/couch/src/couch_doc.erl index 9f0dae45..33d7e3cf 100644 --- a/apps/couch/src/couch_doc.erl +++ b/apps/couch/src/couch_doc.erl @@ -13,11 +13,12 @@ -module(couch_doc). -export([to_doc_info/1,to_doc_info_path/1,parse_rev/1,parse_revs/1,rev_to_str/1,revs_to_strs/1]). --export([att_foldl/3,att_foldl_decode/3,get_validate_doc_fun/1]). +-export([att_foldl/3,range_att_foldl/5,att_foldl_decode/3,get_validate_doc_fun/1]). -export([from_json_obj/1,to_json_obj/2,has_stubs/1, merge_stubs/2]). -export([validate_docid/1]). -export([doc_from_multi_part_stream/2]). -export([doc_to_multi_part_stream/5, len_doc_to_multi_part_stream/4]). +-export([abort_multi_part_stream/1]). -include("couch_db.hrl"). @@ -87,8 +88,14 @@ to_json_attachments(Atts, OutputData, DataToFollow, ShowEncInfo) -> fun(#att{disk_len=DiskLen, att_len=AttLen, encoding=Enc}=Att) -> {Att#att.name, {[ {<<"content_type">>, Att#att.type}, - {<<"revpos">>, Att#att.revpos} - ] ++ + {<<"revpos">>, Att#att.revpos}] ++ + case Att#att.md5 of + <<>> -> + []; + Md5 -> + EncodedMd5 = base64:encode(Md5), + [{<<"digest">>, <<"md5-",EncodedMd5/binary>>}] + end ++ if not OutputData orelse Att#att.data == stub -> [{<<"length">>, DiskLen}, {<<"stub">>, true}]; true -> @@ -165,6 +172,10 @@ parse_revs([Rev | Rest]) -> validate_docid(Id) when is_binary(Id) -> + case couch_util:validate_utf8(Id) of + false -> throw({bad_request, <<"Document id must be valid UTF-8">>}); + true -> ok + end, case Id of <<"_design/", _/binary>> -> ok; <<"_local/", _/binary>> -> ok; @@ -195,6 +206,12 @@ transfer_fields([{<<"_rev">>, _Rev} | Rest], Doc) -> transfer_fields([{<<"_attachments">>, {JsonBins}} | Rest], Doc) -> Atts = lists:map(fun({Name, {BinProps}}) -> + Md5 = case couch_util:get_value(<<"digest">>, BinProps) of + <<"md5-",EncodedMd5/binary>> -> + base64:decode(EncodedMd5); + _ -> + <<>> + end, case couch_util:get_value(<<"stub">>, BinProps) of true -> Type = couch_util:get_value(<<"content_type">>, BinProps), @@ -202,7 +219,7 @@ transfer_fields([{<<"_attachments">>, {JsonBins}} | Rest], Doc) -> DiskLen = couch_util:get_value(<<"length">>, BinProps), {Enc, EncLen} = att_encoding_info(BinProps), #att{name=Name, data=stub, type=Type, att_len=EncLen, - disk_len=DiskLen, encoding=Enc, revpos=RevPos}; + disk_len=DiskLen, encoding=Enc, revpos=RevPos, md5=Md5}; _ -> Type = couch_util:get_value(<<"content_type">>, BinProps, ?DEFAULT_ATTACHMENT_CONTENT_TYPE), @@ -212,7 +229,7 @@ transfer_fields([{<<"_attachments">>, {JsonBins}} | Rest], Doc) -> DiskLen = couch_util:get_value(<<"length">>, BinProps), {Enc, EncLen} = att_encoding_info(BinProps), #att{name=Name, data=follows, type=Type, encoding=Enc, - att_len=EncLen, disk_len=DiskLen, revpos=RevPos}; + att_len=EncLen, disk_len=DiskLen, revpos=RevPos, md5=Md5}; _ -> Value = couch_util:get_value(<<"data">>, BinProps), Bin = base64:decode(Value), @@ -252,6 +269,17 @@ transfer_fields([{<<"_conflicts">>, _} | Rest], Doc) -> transfer_fields([{<<"_deleted_conflicts">>, _} | Rest], Doc) -> transfer_fields(Rest, Doc); +% special fields for replication documents +transfer_fields([{<<"_replication_state">>, _} = Field | Rest], + #doc{body=Fields} = Doc) -> + transfer_fields(Rest, Doc#doc{body=[Field|Fields]}); +transfer_fields([{<<"_replication_state_time">>, _} = Field | Rest], + #doc{body=Fields} = Doc) -> + transfer_fields(Rest, Doc#doc{body=[Field|Fields]}); +transfer_fields([{<<"_replication_id">>, _} = Field | Rest], + #doc{body=Fields} = Doc) -> + transfer_fields(Rest, Doc#doc{body=[Field|Fields]}); + % unknown special field transfer_fields([{<<"_",Name/binary>>, _} | _], _) -> throw({doc_validation, @@ -307,6 +335,9 @@ att_foldl(#att{data={Fd,Sp},md5=Md5}, Fun, Acc) -> att_foldl(#att{data=DataFun,att_len=Len}, Fun, Acc) when is_function(DataFun) -> fold_streamed_data(DataFun, Len, Fun, Acc). +range_att_foldl(#att{data={Fd,Sp}}, From, To, Fun, Acc) -> + couch_stream:range_foldl(Fd, Sp, From, To, Fun, Acc). + att_foldl_decode(#att{data={Fd,Sp},md5=Md5,encoding=Enc}, Fun, Acc) -> couch_stream:foldl_decode(Fd, Sp, Md5, Enc, Fun, Acc); att_foldl_decode(#att{data=Fun2,att_len=Len, encoding=identity}, Fun, Acc) -> @@ -445,11 +476,13 @@ atts_to_mp([Att | RestAtts], Boundary, WriteFun, doc_from_multi_part_stream(ContentType, DataFun) -> - Self = self(), + Parent = self(), Parser = spawn_link(fun() -> - couch_httpd:parse_multipart_request(ContentType, DataFun, - fun(Next)-> mp_parse_doc(Next, []) end), - unlink(Self) + {<<"--">>, _, _} = couch_httpd:parse_multipart_request( + ContentType, DataFun, + fun(Next) -> mp_parse_doc(Next, []) end), + unlink(Parent), + Parent ! {self(), finished} end), Parser ! {get_doc_bytes, self()}, receive @@ -463,7 +496,11 @@ doc_from_multi_part_stream(ContentType, DataFun) -> (A) -> A end, Doc#doc.atts), - {ok, Doc#doc{atts=Atts2}} + WaitFun = fun() -> + receive {Parser, finished} -> ok end, + erlang:put(mochiweb_request_recv, true) + end, + {ok, Doc#doc{atts=Atts2}, WaitFun, Parser} end. mp_parse_doc({headers, H}, []) -> @@ -554,3 +591,20 @@ maybe_send_data({ChunkList, Offset, Counters, Waiting}) -> end end end. + +abort_multi_part_stream(Parser) -> + abort_multi_part_stream(Parser, erlang:monitor(process, Parser)). + +abort_multi_part_stream(Parser, MonRef) -> + case is_process_alive(Parser) of + true -> + Parser ! {get_bytes, self()}, + receive + {bytes, _Bytes} -> + abort_multi_part_stream(Parser, MonRef); + {'DOWN', MonRef, _, _, _} -> + ok + end; + false -> + erlang:demonitor(MonRef, [flush]) + end. diff --git a/apps/couch/src/couch_event_sup.erl b/apps/couch/src/couch_event_sup.erl index 6fd6963a..07c48790 100644 --- a/apps/couch/src/couch_event_sup.erl +++ b/apps/couch/src/couch_event_sup.erl @@ -50,8 +50,12 @@ stop(Pid) -> gen_server:cast(Pid, stop). init({EventMgr, EventHandler, Args}) -> - ok = gen_event:add_sup_handler(EventMgr, EventHandler, Args), - {ok, {EventMgr, EventHandler}}. + case gen_event:add_sup_handler(EventMgr, EventHandler, Args) of + ok -> + {ok, {EventMgr, EventHandler}}; + {stop, Error} -> + {stop, Error} + end. terminate(_Reason, _State) -> ok. diff --git a/apps/couch/src/couch_file.erl b/apps/couch/src/couch_file.erl index 3e4f29fe..dfc1f822 100644 --- a/apps/couch/src/couch_file.erl +++ b/apps/couch/src/couch_file.erl @@ -53,7 +53,10 @@ open(Filepath, Options) -> {trap_exit, true} -> receive {'EXIT', Pid, _} -> ok end; {trap_exit, false} -> ok end, - Error + case Error of + {error, eacces} -> {file_permission_error, Filepath}; + _ -> Error + end end; Error -> Error @@ -161,7 +164,7 @@ truncate(Fd, Pos) -> sync(Filepath) when is_list(Filepath) -> {ok, Fd} = file:open(Filepath, [append, raw]), - try file:sync(Fd) after file:close(Fd) end; + try ok = file:sync(Fd) after ok = file:close(Fd) end; sync(Fd) -> gen_server:call(Fd, sync, infinity). @@ -294,15 +297,23 @@ handle_call(close, _From, #file{fd=Fd}=File) -> {stop, normal, file:close(Fd), File#file{fd = nil}}; handle_call({pread_iolist, Pos}, _From, File) -> - {LenIolist, NextPos} = read_raw_iolist_int(File, Pos, 4), - case iolist_to_binary(LenIolist) of - <<1:1/integer,Len:31/integer>> -> % an MD5-prefixed term - {Md5AndIoList, _} = read_raw_iolist_int(File, NextPos, Len+16), - {Md5, IoList} = extract_md5(Md5AndIoList), + {RawData, NextPos} = try + % up to 8Kbs of read ahead + read_raw_iolist_int(File, Pos, 2 * ?SIZE_BLOCK - (Pos rem ?SIZE_BLOCK)) + catch + _:_ -> + read_raw_iolist_int(File, Pos, 4) + end, + <<Prefix:1/integer, Len:31/integer, RestRawData/binary>> = + iolist_to_binary(RawData), + case Prefix of + 1 -> + {Md5, IoList} = extract_md5( + maybe_read_more_iolist(RestRawData, 16 + Len, NextPos, File)), {reply, {ok, IoList, Md5}, File}; - <<0:1/integer,Len:31/integer>> -> - {Iolist, _} = read_raw_iolist_int(File, NextPos, Len), - {reply, {ok, Iolist, <<>>}, File} + 0 -> + IoList = maybe_read_more_iolist(RestRawData, Len, NextPos, File), + {reply, {ok, IoList, <<>>}, File} end; handle_call({pread, Pos, Bytes}, _From, #file{fd=Fd,tail_append_begin=TailAppendBegin}=File) -> {ok, Bin} = file:pread(Fd, Pos, Bytes), @@ -504,18 +515,36 @@ find_header(Fd, Block) -> end. load_header(Fd, Block) -> - {ok, <<1>>} = file:pread(Fd, Block*?SIZE_BLOCK, 1), - {ok, <<HeaderLen:32/integer>>} = file:pread(Fd, (Block*?SIZE_BLOCK) + 1, 4), + {ok, <<1, HeaderLen:32/integer, RestBlock/binary>>} = + file:pread(Fd, Block * ?SIZE_BLOCK, ?SIZE_BLOCK), TotalBytes = calculate_total_read_len(1, HeaderLen), - {ok, <<RawBin:TotalBytes/binary>>} = - file:pread(Fd, (Block*?SIZE_BLOCK) + 5, TotalBytes), + case TotalBytes > byte_size(RestBlock) of + false -> + <<RawBin:TotalBytes/binary, _/binary>> = RestBlock; + true -> + {ok, Missing} = file:pread( + Fd, (Block * ?SIZE_BLOCK) + 5 + byte_size(RestBlock), + TotalBytes - byte_size(RestBlock)), + RawBin = <<RestBlock/binary, Missing/binary>> + end, <<Md5Sig:16/binary, HeaderBin/binary>> = iolist_to_binary(remove_block_prefixes(1, RawBin)), Md5Sig = couch_util:md5(HeaderBin), {ok, HeaderBin}. +maybe_read_more_iolist(Buffer, DataSize, _, _) + when DataSize =< byte_size(Buffer) -> + <<Data:DataSize/binary, _/binary>> = Buffer, + [Data]; +maybe_read_more_iolist(Buffer, DataSize, NextPos, File) -> + {Missing, _} = + read_raw_iolist_int(File, NextPos, DataSize - byte_size(Buffer)), + [Buffer, Missing]. + -spec read_raw_iolist_int(#file{}, Pos::non_neg_integer(), Len::non_neg_integer()) -> {Data::iolist(), CurPos::non_neg_integer()}. +read_raw_iolist_int(Fd, {Pos, _Size}, Len) -> % 0110 UPGRADE CODE + read_raw_iolist_int(Fd, Pos, Len); read_raw_iolist_int(#file{fd=Fd, tail_append_begin=TAB}, Pos, Len) -> BlockOffset = Pos rem ?SIZE_BLOCK, TotalBytes = calculate_total_read_len(BlockOffset, Len), diff --git a/apps/couch/src/couch_httpd.erl b/apps/couch/src/couch_httpd.erl index 0d9abde6..8fb2687c 100644 --- a/apps/couch/src/couch_httpd.erl +++ b/apps/couch/src/couch_httpd.erl @@ -13,35 +13,50 @@ -module(couch_httpd). -include("couch_db.hrl"). --export([start_link/0, stop/0, handle_request/7]). +-export([start_link/0, start_link/1, stop/0, handle_request/5]). --export([header_value/2,header_value/3,qs_value/2,qs_value/3,qs/1,path/1,absolute_uri/2,body_length/1]). +-export([header_value/2,header_value/3,qs_value/2,qs_value/3,qs/1,qs_json_value/3]). +-export([path/1,absolute_uri/2,body_length/1]). -export([verify_is_server_admin/1,unquote/1,quote/1,recv/2,recv_chunked/4,error_info/1]). --export([make_fun_spec_strs/1, make_arity_1_fun/1]). +-export([make_fun_spec_strs/1]). +-export([make_arity_1_fun/1, make_arity_2_fun/1, make_arity_3_fun/1]). -export([parse_form/1,json_body/1,json_body_obj/1,body/1,doc_etag/1, make_etag/1, etag_respond/3]). -export([primary_header_value/2,partition/1,serve_file/3,serve_file/4, server_header/0]). -export([start_chunked_response/3,send_chunk/2,log_request/2]). --export([start_response_length/4, send/2]). +-export([start_response_length/4, start_response/3, send/2]). -export([start_json_response/2, start_json_response/3, end_json_response/1]). -export([send_response/4,send_method_not_allowed/2,send_error/4, send_redirect/2,send_chunked_error/2]). -export([send_json/2,send_json/3,send_json/4,last_chunk/1,parse_multipart_request/3]). -export([accepted_encodings/1,handle_request_int/5,validate_referer/1,validate_ctype/2]). start_link() -> + start_link(http). +start_link(http) -> + Port = couch_config:get("httpd", "port", "5984"), + start_link(?MODULE, [{port, Port}]); +start_link(https) -> + Port = couch_config:get("ssl", "port", "6984"), + CertFile = couch_config:get("ssl", "cert_file", nil), + KeyFile = couch_config:get("ssl", "key_file", nil), + Options = case CertFile /= nil andalso KeyFile /= nil of + true -> + [{port, Port}, + {ssl, true}, + {ssl_opts, [ + {certfile, CertFile}, + {keyfile, KeyFile}]}]; + false -> + io:format("SSL enabled but PEM certificates are missing.", []), + throw({error, missing_certs}) + end, + start_link(https, Options). +start_link(Name, Options) -> % read config and register for configuration changes % just stop if one of the config settings change. couch_server_sup % will restart us and then we will pick up the new settings. BindAddress = couch_config:get("httpd", "bind_address", any), - Port = couch_config:get("httpd", "port", "5984"), - MaxConnections = couch_config:get("httpd", "max_connections", "2048"), - VirtualHosts = couch_config:get("vhosts"), - VhostGlobals = re:split( - couch_config:get("httpd", "vhost_global_handlers", ""), - ", ?", - [{return, list}] - ), DefaultSpec = "{couch_httpd_db, handle_request}", DefaultFun = make_arity_1_fun( couch_config:get("httpd", "default_handler", DefaultSpec) @@ -65,21 +80,28 @@ start_link() -> UrlHandlers = dict:from_list(UrlHandlersList), DbUrlHandlers = dict:from_list(DbUrlHandlersList), DesignUrlHandlers = dict:from_list(DesignUrlHandlersList), + {ok, ServerOptions} = couch_util:parse_term( + couch_config:get("httpd", "server_options", "[]")), + {ok, SocketOptions} = couch_util:parse_term( + couch_config:get("httpd", "socket_options", "[]")), Loop = fun(Req)-> + case SocketOptions of + [] -> + ok; + _ -> + ok = mochiweb_socket:setopts(Req:get(socket), SocketOptions) + end, apply(?MODULE, handle_request, [ - Req, DefaultFun, UrlHandlers, DbUrlHandlers, DesignUrlHandlers, - VirtualHosts, VhostGlobals + Req, DefaultFun, UrlHandlers, DbUrlHandlers, DesignUrlHandlers ]) end, % and off we go - {ok, Pid} = case mochiweb_http:start([ + {ok, Pid} = case mochiweb_http:start(Options ++ ServerOptions ++ [ {loop, Loop}, - {name, ?MODULE}, - {ip, BindAddress}, - {port, Port}, - {max, MaxConnections} + {name, Name}, + {ip, BindAddress} ]) of {ok, MochiPid} -> {ok, MochiPid}; {error, Reason} -> @@ -92,15 +114,19 @@ start_link() -> ?MODULE:stop(); ("httpd", "port") -> ?MODULE:stop(); - ("httpd", "max_connections") -> - ?MODULE:stop(); ("httpd", "default_handler") -> ?MODULE:stop(); + ("httpd", "server_options") -> + ?MODULE:stop(); + ("httpd", "socket_options") -> + ?MODULE:stop(); ("httpd_global_handlers", _) -> ?MODULE:stop(); ("httpd_db_handlers", _) -> ?MODULE:stop(); ("vhosts", _) -> + ?MODULE:stop(); + ("ssl", _) -> ?MODULE:stop() end, Pid), @@ -139,50 +165,13 @@ make_fun_spec_strs(SpecStr) -> stop() -> mochiweb_http:stop(?MODULE). -%% -% if there's a vhost definition that matches the request, redirect internally -redirect_to_vhost(MochiReq, DefaultFun, - UrlHandlers, DbUrlHandlers, DesignUrlHandlers, VhostTarget) -> - - Path = MochiReq:get(raw_path), - Target = VhostTarget ++ Path, - ?LOG_DEBUG("Vhost Target: '~p'~n", [Target]), - % build a new mochiweb request - MochiReq1 = mochiweb_request:new(MochiReq:get(socket), - MochiReq:get(method), - Target, - MochiReq:get(version), - MochiReq:get(headers)), - % cleanup, It force mochiweb to reparse raw uri. - MochiReq1:cleanup(), +handle_request(MochiReq, DefaultFun, UrlHandlers, DbUrlHandlers, + DesignUrlHandlers) -> + MochiReq1 = couch_httpd_vhost:match_vhost(MochiReq), handle_request_int(MochiReq1, DefaultFun, - UrlHandlers, DbUrlHandlers, DesignUrlHandlers). - -handle_request(MochiReq, DefaultFun, - UrlHandlers, DbUrlHandlers, DesignUrlHandlers, VirtualHosts, VhostGlobals) -> - - % grab Host from Req - Vhost = MochiReq:get_header_value("Host"), - - % find Vhost in config - case couch_util:get_value(Vhost, VirtualHosts) of - undefined -> % business as usual - handle_request_int(MochiReq, DefaultFun, - UrlHandlers, DbUrlHandlers, DesignUrlHandlers); - VhostTarget -> - case vhost_global(VhostGlobals, MochiReq) of - true ->% global handler for vhosts - handle_request_int(MochiReq, DefaultFun, - UrlHandlers, DbUrlHandlers, DesignUrlHandlers); - _Else -> - % do rewrite - redirect_to_vhost(MochiReq, DefaultFun, - UrlHandlers, DbUrlHandlers, DesignUrlHandlers, VhostTarget) - end - end. - + UrlHandlers, DbUrlHandlers, DesignUrlHandlers). handle_request_int(MochiReq, DefaultFun, UrlHandlers, DbUrlHandlers, DesignUrlHandlers) -> @@ -194,6 +183,14 @@ handle_request_int(MochiReq, DefaultFun, RawUri = MochiReq:get(raw_path), {"/" ++ Path, _, _} = mochiweb_util:urlsplit_path(RawUri), + Headers = MochiReq:get(headers), + + % get requested path + RequestedPath = case MochiReq:get_header_value("x-couchdb-vhost-path") of + undefined -> RawUri; + P -> P + end, + HandlerKey = case mochiweb_util:partition(Path, "/") of {"", "", ""} -> @@ -201,10 +198,11 @@ handle_request_int(MochiReq, DefaultFun, {FirstPart, _, _} -> list_to_binary(FirstPart) end, - ?LOG_DEBUG("~p ~s ~p~nHeaders: ~p", [ + ?LOG_DEBUG("~p ~s ~p from ~p~nHeaders: ~p", [ MochiReq:get(method), RawUri, MochiReq:get(version), + MochiReq:get(peer), mochiweb_headers:to_list(MochiReq:get(headers)) ]), @@ -245,6 +243,8 @@ handle_request_int(MochiReq, DefaultFun, mochi_req = MochiReq, peer = MochiReq:get(peer), method = Method, + requested_path_parts = [list_to_binary(couch_httpd:unquote(Part)) + || Part <- string:tokens(RequestedPath, "/")], path_parts = [list_to_binary(couch_httpd:unquote(Part)) || Part <- string:tokens(Path, "/")], db_url_handlers = DbUrlHandlers, @@ -269,7 +269,7 @@ handle_request_int(MochiReq, DefaultFun, throw:{invalid_json, S} -> ?LOG_ERROR("attempted upload of invalid JSON (set log_level to debug to log it)", []), ?LOG_DEBUG("Invalid JSON: ~p",[S]), - send_error(HttpReq, {bad_request, "invalid UTF-8 JSON"}); + send_error(HttpReq, {bad_request, io_lib:format("invalid UTF-8 JSON: ~p",[S])}); throw:unacceptable_encoding -> ?LOG_ERROR("unsupported encoding method for the response", []), send_error(HttpReq, {not_acceptable, "unsupported encoding"}); @@ -326,18 +326,6 @@ authenticate_request(Response, _AuthSrcs) -> increment_method_stats(Method) -> couch_stats_collector:increment({httpd_request_methods, Method}). -% if so, then it will not be rewritten, but will run as a normal couchdb request. -% normally you'd use this for _uuids _utils and a few of the others you want to keep available on vhosts. You can also use it to make databases 'global'. -vhost_global(VhostGlobals, MochiReq) -> - "/" ++ Path = MochiReq:get(path), - Front = case partition(Path) of - {"", "", ""} -> - "/"; % Special case the root url handler - {FirstPart, _, _} -> - FirstPart - end, - [true] == [true||V <- VhostGlobals, V == Front]. - validate_referer(Req) -> Host = host_for_request(Req), Referer = header_value(Req, "Referer", fail), @@ -406,6 +394,14 @@ qs_value(Req, Key) -> qs_value(Req, Key, Default) -> couch_util:get_value(Key, qs(Req), Default). +qs_json_value(Req, Key, Default) -> + case qs_value(Req, Key, Default) of + Default -> + Default; + Result -> + ?JSON_DECODE(Result) + end. + qs(#httpd{mochi_req=MochiReq}) -> MochiReq:parse_qs(). @@ -430,15 +426,18 @@ absolute_uri(#httpd{mochi_req=MochiReq}=Req, Path) -> Host = host_for_request(Req), XSsl = couch_config:get("httpd", "x_forwarded_ssl", "X-Forwarded-Ssl"), Scheme = case MochiReq:get_header_value(XSsl) of - "on" -> "https"; - _ -> - XProto = couch_config:get("httpd", "x_forwarded_proto", "X-Forwarded-Proto"), - case MochiReq:get_header_value(XProto) of - % Restrict to "https" and "http" schemes only - "https" -> "https"; - _ -> "http" - end - end, + "on" -> "https"; + _ -> + XProto = couch_config:get("httpd", "x_forwarded_proto", "X-Forwarded-Proto"), + case MochiReq:get_header_value(XProto) of + %% Restrict to "https" and "http" schemes only + "https" -> "https"; + _ -> case MochiReq:get(scheme) of + https -> "https"; + http -> "http" + end + end + end, Scheme ++ "://" ++ Host ++ Path. unquote(UrlEncodedString) -> @@ -545,6 +544,18 @@ start_response_length(#httpd{mochi_req=MochiReq}=Req, Code, Headers, Length) -> end, {ok, Resp}. +start_response(#httpd{mochi_req=MochiReq}=Req, Code, Headers) -> + log_request(Req, Code), + couch_stats_collector:increment({httpd_status_cdes, Code}), + CookieHeader = couch_httpd_auth:cookie_auth_header(Req, Headers), + Headers2 = Headers ++ server_header() ++ CookieHeader, + Resp = MochiReq:start_response({Code, Headers2}), + case MochiReq:get(method) of + 'HEAD' -> throw({http_head_abort, Resp}); + _ -> ok + end, + {ok, Resp}. + send(Resp, Data) -> Resp:send(Data), {ok, Resp}. @@ -612,9 +623,7 @@ send_json(Req, Code, Headers, Value) -> {"Content-Type", negotiate_content_type(Req)}, {"Cache-Control", "must-revalidate"} ], - Body = list_to_binary( - [start_jsonp(Req), ?JSON_ENCODE(Value), end_jsonp(), $\n] - ), + Body = [start_jsonp(Req), ?JSON_ENCODE(Value), end_jsonp(), $\n], send_response(Req, Code, DefaultHeaders ++ Headers, Body). start_json_response(Req, Code) -> @@ -723,6 +732,8 @@ error_info(file_exists) -> "created, the file already exists.">>}; error_info({bad_ctype, Reason}) -> {415, <<"bad_content_type">>, Reason}; +error_info(requested_range_not_satisfiable) -> + {416, <<"requested_range_not_satisfiable">>, <<"Requested range not satisfiable">>}; error_info({error, illegal_database_name}) -> {400, <<"illegal_database_name">>, <<"Only lowercase characters (a-z), " "digits (0-9), and any of the characters _, $, (, ), +, -, and / " @@ -753,31 +764,29 @@ error_headers(#httpd{mochi_req=MochiReq}=Req, Code, ErrorStr, ReasonStr) -> % send the browser popup header no matter what if we are require_valid_user {Code, [{"WWW-Authenticate", "Basic realm=\"server\""}]}; _False -> - % if the accept header matches html, then do the redirect. else proceed as usual. - Accepts = case MochiReq:get_header_value("Accept") of - undefined -> - % According to the HTTP 1.1 spec, if the Accept - % header is missing, it means the client accepts - % all media types. - "html"; - Else -> - Else - end, - case re:run(Accepts, "\\bhtml\\b", - [{capture, none}, caseless]) of - nomatch -> + case MochiReq:accepts_content_type("application/json") of + true -> {Code, []}; - match -> - AuthRedirectBin = ?l2b(AuthRedirect), - % Redirect to the path the user requested, not - % the one that is used internally. - UrlReturnRaw = case MochiReq:get_header_value("x-couchdb-vhost-path") of - undefined -> MochiReq:get(path); - VHostPath -> VHostPath - end, - UrlReturn = ?l2b(couch_util:url_encode(UrlReturnRaw)), - UrlReason = ?l2b(couch_util:url_encode(ReasonStr)), - {302, [{"Location", couch_httpd:absolute_uri(Req, <<AuthRedirectBin/binary,"?return=",UrlReturn/binary,"&reason=",UrlReason/binary>>)}]} + false -> + case MochiReq:accepts_content_type("text/html") of + true -> + % Redirect to the path the user requested, not + % the one that is used internally. + UrlReturnRaw = case MochiReq:get_header_value("x-couchdb-vhost-path") of + undefined -> + MochiReq:get(path); + VHostPath -> + VHostPath + end, + RedirectLocation = lists:flatten([ + AuthRedirect, + "?return=", couch_util:url_encode(UrlReturnRaw), + "&reason=", couch_util:url_encode(ReasonStr) + ]), + {302, [{"Location", absolute_uri(Req, RedirectLocation)}]}; + false -> + {Code, []} + end end end end; @@ -842,9 +851,8 @@ negotiate_content_type(#httpd{mochi_req=MochiReq}) -> end. server_header() -> - OTPVersion = "R" ++ integer_to_list(erlang:system_info(compat_rel)) ++ "B", [{"Server", "CouchDB/" ++ couch:version() ++ - " (Erlang OTP/" ++ OTPVersion ++ ")"}]. + " (Erlang OTP/" ++ erlang:system_info(otp_release) ++ ")"}]. -record(mp, {boundary, buffer, data_fun, callback}). diff --git a/apps/couch/src/couch_httpd_auth.erl b/apps/couch/src/couch_httpd_auth.erl index 752fbef1..9f6ed18a 100644 --- a/apps/couch/src/couch_httpd_auth.erl +++ b/apps/couch/src/couch_httpd_auth.erl @@ -173,7 +173,7 @@ cookie_authentication_handler(#httpd{mochi_req=MochiReq}=Req) -> CurrentTime = make_cookie_time(), case couch_config:get("couch_httpd_auth", "secret", nil) of nil -> - ?LOG_ERROR("cookie auth secret is not set",[]), + ?LOG_DEBUG("cookie auth secret is not set",[]), Req; SecretStr -> Secret = ?l2b(SecretStr), @@ -207,7 +207,7 @@ cookie_authentication_handler(#httpd{mochi_req=MochiReq}=Req) -> end. cookie_auth_header(#httpd{user_ctx=#user_ctx{name=null}}, _Headers) -> []; -cookie_auth_header(#httpd{user_ctx=#user_ctx{name=User}, auth={Secret, true}}, Headers) -> +cookie_auth_header(#httpd{user_ctx=#user_ctx{name=User}, auth={Secret, true}}=Req, Headers) -> % Note: we only set the AuthSession cookie if: % * a valid AuthSession cookie has been received % * we are outside a 10% timeout window @@ -220,18 +220,18 @@ cookie_auth_header(#httpd{user_ctx=#user_ctx{name=User}, auth={Secret, true}}, H AuthSession = couch_util:get_value("AuthSession", Cookies), if AuthSession == undefined -> TimeStamp = make_cookie_time(), - [cookie_auth_cookie(?b2l(User), Secret, TimeStamp)]; + [cookie_auth_cookie(Req, ?b2l(User), Secret, TimeStamp)]; true -> [] end; cookie_auth_header(_Req, _Headers) -> []. -cookie_auth_cookie(User, Secret, TimeStamp) -> +cookie_auth_cookie(Req, User, Secret, TimeStamp) -> SessionData = User ++ ":" ++ erlang:integer_to_list(TimeStamp, 16), Hash = crypto:sha_mac(Secret, SessionData), mochiweb_cookies:cookie("AuthSession", couch_util:encodeBase64Url(SessionData ++ ":" ++ ?b2l(Hash)), - [{path, "/"}, {http_only, true}]). % TODO add {secure, true} when SSL is detected + [{path, "/"}] ++ cookie_scheme(Req)). hash_password(Password, Salt) -> ?l2b(couch_util:to_hex(crypto:sha(<<Password/binary, Salt/binary>>))). @@ -247,13 +247,17 @@ ensure_cookie_auth_secret() -> % session handlers % Login handler with user db -% TODO this should also allow a JSON POST handle_session_req(#httpd{method='POST', mochi_req=MochiReq}=Req) -> ReqBody = MochiReq:recv_body(), Form = case MochiReq:get_primary_header_value("content-type") of % content type should be json "application/x-www-form-urlencoded" ++ _ -> mochiweb_util:parse_qs(ReqBody); + "application/json" ++ _ -> + {Pairs} = ?JSON_DECODE(ReqBody), + lists:map(fun({Key, Value}) -> + {?b2l(Key), ?b2l(Value)} + end, Pairs); _ -> [] end, @@ -272,7 +276,7 @@ handle_session_req(#httpd{method='POST', mochi_req=MochiReq}=Req) -> % setup the session cookie Secret = ?l2b(ensure_cookie_auth_secret()), CurrentTime = make_cookie_time(), - Cookie = cookie_auth_cookie(?b2l(UserName), <<Secret/binary, UserSalt/binary>>, CurrentTime), + Cookie = cookie_auth_cookie(Req, ?b2l(UserName), <<Secret/binary, UserSalt/binary>>, CurrentTime), % TODO document the "next" feature in Futon {Code, Headers} = case couch_httpd:qs_value(Req, "next", nil) of nil -> @@ -288,7 +292,7 @@ handle_session_req(#httpd{method='POST', mochi_req=MochiReq}=Req) -> ]}); _Else -> % clear the session - Cookie = mochiweb_cookies:cookie("AuthSession", "", [{path, "/"}, {http_only, true}]), + Cookie = mochiweb_cookies:cookie("AuthSession", "", [{path, "/"}] ++ cookie_scheme(Req)), send_json(Req, 401, [Cookie], {[{error, <<"unauthorized">>},{reason, <<"Name or password is incorrect.">>}]}) end; % get user info @@ -318,7 +322,7 @@ handle_session_req(#httpd{method='GET', user_ctx=UserCtx}=Req) -> end; % logout by deleting the session handle_session_req(#httpd{method='DELETE'}=Req) -> - Cookie = mochiweb_cookies:cookie("AuthSession", "", [{path, "/"}, {http_only, true}]), + Cookie = mochiweb_cookies:cookie("AuthSession", "", [{path, "/"}] ++ cookie_scheme(Req)), {Code, Headers} = case couch_httpd:qs_value(Req, "next", nil) of nil -> {200, [Cookie]}; @@ -347,3 +351,10 @@ to_int(Value) when is_integer(Value) -> make_cookie_time() -> {NowMS, NowS, _} = erlang:now(), NowMS * 1000000 + NowS. + +cookie_scheme(#httpd{mochi_req=MochiReq}) -> + [{http_only, true}] ++ + case MochiReq:get(scheme) of + http -> []; + https -> [{secure, true}] + end. diff --git a/apps/couch/src/couch_httpd_db.erl b/apps/couch/src/couch_httpd_db.erl index 217a2d03..71204598 100644 --- a/apps/couch/src/couch_httpd_db.erl +++ b/apps/couch/src/couch_httpd_db.erl @@ -20,7 +20,7 @@ -import(couch_httpd, [send_json/2,send_json/3,send_json/4,send_method_not_allowed/2, - start_json_response/2,start_json_response/3, + send_response/4,start_json_response/2,start_json_response/3, send_chunk/2,last_chunk/1,end_json_response/1, start_chunked_response/3, absolute_uri/2, send/2, start_response_length/4]). @@ -55,7 +55,15 @@ handle_request(#httpd{path_parts=[DbName|RestParts],method=Method, do_db_req(Req, Handler) end. +handle_changes_req(#httpd{method='POST'}=Req, Db) -> + couch_httpd:validate_ctype(Req, "application/json"), + handle_changes_req1(Req, Db); handle_changes_req(#httpd{method='GET'}=Req, Db) -> + handle_changes_req1(Req, Db); +handle_changes_req(#httpd{path_parts=[_,<<"_changes">>]}=Req, _Db) -> + send_method_not_allowed(Req, "GET,HEAD,POST"). + +handle_changes_req1(Req, Db) -> MakeCallback = fun(Resp) -> fun({change, Change, _}, "continuous") -> send_chunk(Resp, [?JSON_ENCODE(Change) | "\n"]); @@ -106,13 +114,16 @@ handle_changes_req(#httpd{method='GET'}=Req, Db) -> FeedChangesFun(MakeCallback(Resp)) end end, - couch_stats_collector:track_process_count( + couch_stats_collector:increment( {httpd, clients_requesting_changes} ), - WrapperFun(ChangesFun); - -handle_changes_req(#httpd{path_parts=[_,<<"_changes">>]}=Req, _Db) -> - send_method_not_allowed(Req, "GET,HEAD"). + try + WrapperFun(ChangesFun) + after + couch_stats_collector:decrement( + {httpd, clients_requesting_changes} + ) + end. handle_compact_req(#httpd{method='POST',path_parts=[DbName,_,Id|_]}=Req, Db) -> ok = couch_db:check_is_admin(Db), @@ -353,9 +364,11 @@ db_req(#httpd{path_parts=[_,<<"_purge">>]}=Req, _Db) -> send_method_not_allowed(Req, "POST"); db_req(#httpd{method='GET',path_parts=[_,<<"_all_docs">>]}=Req, Db) -> - all_docs_view(Req, Db, nil); + Keys = couch_httpd:qs_json_value(Req, "keys", nil), + all_docs_view(Req, Db, Keys); db_req(#httpd{method='POST',path_parts=[_,<<"_all_docs">>]}=Req, Db) -> + couch_httpd:validate_ctype(Req, "application/json"), {Fields} = couch_httpd:json_body_obj(Req), case couch_util:get_value(<<"keys">>, Fields, nil) of nil -> @@ -497,12 +510,13 @@ all_docs_view(Req, Db, Keys) -> nil -> FoldlFun = couch_httpd_view:make_view_fold_fun(Req, QueryArgs, CurrentEtag, Db, UpdateSeq, TotalRowCount, #view_fold_helper_funs{ - reduce_count = fun couch_db:enum_docs_reduce_to_count/1 + reduce_count = fun couch_db:enum_docs_reduce_to_count/1, + send_row = fun all_docs_send_json_view_row/6 }), AdapterFun = fun(#full_doc_info{id=Id}=FullDocInfo, Offset, Acc) -> case couch_doc:to_doc_info(FullDocInfo) of - #doc_info{revs=[#rev_info{deleted=false, rev=Rev}|_]} -> - FoldlFun({{Id, Id}, {[{rev, couch_doc:rev_to_str(Rev)}]}}, Offset, Acc); + #doc_info{revs=[#rev_info{deleted=false}|_]} = DocInfo -> + FoldlFun({{Id, Id}, DocInfo}, Offset, Acc); #doc_info{revs=[#rev_info{deleted=true}|_]} -> {ok, Acc} end @@ -514,7 +528,8 @@ all_docs_view(Req, Db, Keys) -> _ -> FoldlFun = couch_httpd_view:make_view_fold_fun(Req, QueryArgs, CurrentEtag, Db, UpdateSeq, TotalRowCount, #view_fold_helper_funs{ - reduce_count = fun(Offset) -> Offset end + reduce_count = fun(Offset) -> Offset end, + send_row = fun all_docs_send_json_view_row/6 }), KeyFoldFun = case Dir of fwd -> @@ -526,10 +541,8 @@ all_docs_view(Req, Db, Keys) -> fun(Key, FoldAcc) -> DocInfo = (catch couch_db:get_doc_info(Db, Key)), Doc = case DocInfo of - {ok, #doc_info{id=Id, revs=[#rev_info{deleted=false, rev=Rev}|_]}} -> - {{Id, Id}, {[{rev, couch_doc:rev_to_str(Rev)}]}}; - {ok, #doc_info{id=Id, revs=[#rev_info{deleted=true, rev=Rev}|_]}} -> - {{Id, Id}, {[{rev, couch_doc:rev_to_str(Rev)}, {deleted, true}]}}; + {ok, #doc_info{id = Id} = Di} -> + {{Id, Id}, Di}; not_found -> {{Key, error}, not_found}; _ -> @@ -543,6 +556,33 @@ all_docs_view(Req, Db, Keys) -> end end). +all_docs_send_json_view_row(Resp, Db, KV, IncludeDocs, Conflicts, RowFront) -> + JsonRow = all_docs_view_row_obj(Db, KV, IncludeDocs, Conflicts), + send_chunk(Resp, RowFront ++ ?JSON_ENCODE(JsonRow)), + {ok, ",\r\n"}. + +all_docs_view_row_obj(_Db, {{DocId, error}, Value}, _IncludeDocs, _Conflicts) -> + {[{key, DocId}, {error, Value}]}; +all_docs_view_row_obj(Db, {_KeyDocId, DocInfo}, true, Conflicts) -> + case DocInfo of + #doc_info{revs = [#rev_info{deleted = true} | _]} -> + {all_docs_row(DocInfo) ++ [{doc, null}]}; + _ -> + {all_docs_row(DocInfo) ++ couch_httpd_view:doc_member( + Db, DocInfo, if Conflicts -> [conflicts]; true -> [] end)} + end; +all_docs_view_row_obj(_Db, {_KeyDocId, DocInfo}, _IncludeDocs, _Conflicts) -> + {all_docs_row(DocInfo)}. + +all_docs_row(#doc_info{id = Id, revs = [RevInfo | _]}) -> + #rev_info{rev = Rev, deleted = Del} = RevInfo, + [ {id, Id}, {key, Id}, + {value, {[{rev, couch_doc:rev_to_str(Rev)}] ++ case Del of + true -> [{deleted, true}]; + false -> [] + end}} ]. + + db_doc_req(#httpd{method='DELETE'}=Req, Db, DocId) -> % check for the existence of the doc to handle the 404 case. couch_doc_open(Db, DocId, nil, []), @@ -556,29 +596,26 @@ db_doc_req(#httpd{method='DELETE'}=Req, Db, DocId) -> {[{<<"_rev">>, ?l2b(Rev)},{<<"_deleted">>,true}]})) end; -db_doc_req(#httpd{method='GET'}=Req, Db, DocId) -> +db_doc_req(#httpd{method = 'GET', mochi_req = MochiReq} = Req, Db, DocId) -> #doc_query_args{ rev = Rev, open_revs = Revs, - options = Options, + options = Options1, atts_since = AttsSince } = parse_doc_query(Req), + Options = case AttsSince of + nil -> + Options1; + RevList when is_list(RevList) -> + [{atts_since, RevList}, attachments | Options1] + end, case Revs of [] -> - Options2 = - if AttsSince /= nil -> - [{atts_since, AttsSince}, attachments | Options]; - true -> Options - end, - Doc = couch_doc_open(Db, DocId, Rev, Options2), - send_doc(Req, Doc, Options2); + Doc = couch_doc_open(Db, DocId, Rev, Options), + send_doc(Req, Doc, Options); _ -> {ok, Results} = couch_db:open_doc_revs(Db, DocId, Revs, Options), - AcceptedTypes = case couch_httpd:header_value(Req, "Accept") of - undefined -> []; - AcceptHeader -> string:tokens(AcceptHeader, ", ") - end, - case lists:member("multipart/mixed", AcceptedTypes) of + case MochiReq:accepts_content_type("multipart/mixed") of false -> {ok, Resp} = start_json_response(Req, 200), send_chunk(Resp, "["), @@ -612,13 +649,12 @@ db_doc_req(#httpd{method='POST'}=Req, Db, DocId) -> couch_doc:validate_docid(DocId), couch_httpd:validate_ctype(Req, "multipart/form-data"), Form = couch_httpd:parse_form(Req), - case proplists:is_defined("_doc", Form) of - true -> - Json = ?JSON_DECODE(couch_util:get_value("_doc", Form)), - Doc = couch_doc_from_req(Req, DocId, Json); - false -> - Rev = couch_doc:parse_rev(list_to_binary(couch_util:get_value("_rev", Form))), - {ok, [{ok, Doc}]} = couch_db:open_doc_revs(Db, DocId, [Rev], []) + case couch_util:get_value("_doc", Form) of + undefined -> + Rev = couch_doc:parse_rev(couch_util:get_value("_rev", Form)), + {ok, [{ok, Doc}]} = couch_db:open_doc_revs(Db, DocId, [Rev], []); + Json -> + Doc = couch_doc_from_req(Req, DocId, ?JSON_DECODE(Json)) end, UpdatedAtts = [ #att{name=validate_attachment_name(Name), @@ -656,10 +692,18 @@ db_doc_req(#httpd{method='PUT'}=Req, Db, DocId) -> RespHeaders = [{"Location", Loc}], case couch_util:to_list(couch_httpd:header_value(Req, "Content-Type")) of ("multipart/related;" ++ _) = ContentType -> - {ok, Doc0} = couch_doc:doc_from_multi_part_stream(ContentType, - fun() -> receive_request_data(Req) end), + {ok, Doc0, WaitFun, Parser} = couch_doc:doc_from_multi_part_stream( + ContentType, fun() -> receive_request_data(Req) end), Doc = couch_doc_from_req(Req, DocId, Doc0), - update_doc(Req, Db, DocId, Doc, RespHeaders, UpdateType); + try + Result = update_doc(Req, Db, DocId, Doc, RespHeaders, UpdateType), + WaitFun(), + Result + catch throw:Err -> + % Document rejected by a validate_doc_update function. + couch_doc:abort_multi_part_stream(Parser), + throw(Err) + end; _Else -> case couch_httpd:qs_value(Req, "batch") of "ok" -> @@ -721,20 +765,17 @@ send_doc(Req, Doc, Options) -> send_doc_efficiently(Req, #doc{atts=[]}=Doc, Headers, Options) -> send_json(Req, 200, Headers, couch_doc:to_json_obj(Doc, Options)); -send_doc_efficiently(Req, #doc{atts=Atts}=Doc, Headers, Options) -> +send_doc_efficiently(#httpd{mochi_req = MochiReq} = Req, + #doc{atts = Atts} = Doc, Headers, Options) -> case lists:member(attachments, Options) of true -> - AcceptedTypes = case couch_httpd:header_value(Req, "Accept") of - undefined -> []; - AcceptHeader -> string:tokens(AcceptHeader, ", ") - end, - case lists:member("multipart/related", AcceptedTypes) of + case MochiReq:accepts_content_type("multipart/related") of false -> send_json(Req, 200, Headers, couch_doc:to_json_obj(Doc, Options)); true -> Boundary = couch_uuids:random(), JsonBytes = ?JSON_ENCODE(couch_doc:to_json_obj(Doc, - [attachments, follows|Options])), + [attachments, follows, att_encoding_info | Options])), {ContentType, Len} = couch_doc:len_doc_to_multi_part_stream( Boundary,JsonBytes, Atts, true), CType = {<<"Content-Type">>, ContentType}, @@ -776,9 +817,39 @@ send_docs_multipart(Req, Results, Options1) -> couch_httpd:send_chunk(Resp, <<"--">>), couch_httpd:last_chunk(Resp). +send_ranges_multipart(Req, ContentType, Len, Att, Ranges) -> + Boundary = couch_uuids:random(), + CType = {"Content-Type", + "multipart/byteranges; boundary=\"" ++ ?b2l(Boundary) ++ "\""}, + {ok, Resp} = start_chunked_response(Req, 206, [CType]), + couch_httpd:send_chunk(Resp, <<"--", Boundary/binary>>), + lists:foreach(fun({From, To}) -> + ContentRange = make_content_range(From, To, Len), + couch_httpd:send_chunk(Resp, + <<"\r\nContent-Type: ", ContentType/binary, "\r\n", + "Content-Range: ", ContentRange/binary, "\r\n", + "\r\n">>), + couch_doc:range_att_foldl(Att, From, To + 1, + fun(Seg, _) -> send_chunk(Resp, Seg) end, {ok, Resp}), + couch_httpd:send_chunk(Resp, <<"\r\n--", Boundary/binary>>) + end, Ranges), + couch_httpd:send_chunk(Resp, <<"--">>), + couch_httpd:last_chunk(Resp), + {ok, Resp}. + receive_request_data(Req) -> - {couch_httpd:recv(Req, 0), fun() -> receive_request_data(Req) end}. + receive_request_data(Req, couch_httpd:body_length(Req)). + +receive_request_data(Req, LenLeft) when LenLeft > 0 -> + Len = erlang:min(4096, LenLeft), + Data = couch_httpd:recv(Req, Len), + {Data, fun() -> receive_request_data(Req, LenLeft - iolist_size(Data)) end}; +receive_request_data(_Req, _) -> + throw(<<"expected more data">>). +make_content_range(From, To, Len) -> + ?l2b(io_lib:format("bytes ~B-~B/~B", [From, To, Len])). + update_doc_result_to_json({{Id, Rev}, Error}) -> {_Code, Err, Msg} = couch_httpd:error_info(Error), {[{id, Id}, {rev, couch_doc:rev_to_str(Rev)}, @@ -863,7 +934,7 @@ couch_doc_open(Db, DocId, Rev, Options) -> % Attachment request handlers -db_attachment_req(#httpd{method='GET'}=Req, Db, DocId, FileNameParts) -> +db_attachment_req(#httpd{method='GET',mochi_req=MochiReq}=Req, Db, DocId, FileNameParts) -> FileName = list_to_binary(mochiweb_util:join(lists:map(fun binary_to_list/1, FileNameParts),"/")), #doc_query_args{ rev=Rev, @@ -881,16 +952,6 @@ db_attachment_req(#httpd{method='GET'}=Req, Db, DocId, FileNameParts) -> atom_to_list(Enc), couch_httpd:accepted_encodings(Req) ), - Headers = [ - {"ETag", Etag}, - {"Cache-Control", "must-revalidate"}, - {"Content-Type", binary_to_list(Type)} - ] ++ case ReqAcceptsAttEnc of - true -> - [{"Content-Encoding", atom_to_list(Enc)}]; - _ -> - [] - end, Len = case {Enc, ReqAcceptsAttEnc} of {identity, _} -> % stored and served in identity form @@ -910,6 +971,23 @@ db_attachment_req(#httpd{method='GET'}=Req, Db, DocId, FileNameParts) -> % header we'll fall back to a chunked response. undefined end, + Headers = [ + {"ETag", Etag}, + {"Cache-Control", "must-revalidate"}, + {"Content-Type", binary_to_list(Type)} + ] ++ case ReqAcceptsAttEnc of + true when Enc =/= identity -> + % RFC 2616 says that the 'identify' encoding should not be used in + % the Content-Encoding header + [{"Content-Encoding", atom_to_list(Enc)}]; + _ -> + [] + end ++ case Enc of + identity -> + [{"Accept-Ranges", "bytes"}]; + _ -> + [{"Accept-Ranges", "none"}] + end, AttFun = case ReqAcceptsAttEnc of false -> fun couch_doc:att_foldl_decode/3; @@ -923,11 +1001,29 @@ db_attachment_req(#httpd{method='GET'}=Req, Db, DocId, FileNameParts) -> case Len of undefined -> {ok, Resp} = start_chunked_response(Req, 200, Headers), - AttFun(Att, fun(Seg, _) -> send_chunk(Resp, Seg) end, ok), + AttFun(Att, fun(Seg, _) -> send_chunk(Resp, Seg) end, {ok, Resp}), last_chunk(Resp); _ -> - {ok, Resp} = start_response_length(Req, 200, Headers, Len), - AttFun(Att, fun(Seg, _) -> send(Resp, Seg) end, ok) + Ranges = parse_ranges(MochiReq:get(range), Len), + case {Enc, Ranges} of + {identity, [{From, To}]} -> + Headers1 = [{<<"Content-Range">>, make_content_range(From, To, Len)}] + ++ Headers, + {ok, Resp} = start_response_length(Req, 206, Headers1, To - From + 1), + couch_doc:range_att_foldl(Att, From, To + 1, + fun(Seg, _) -> send(Resp, Seg) end, {ok, Resp}); + {identity, Ranges} when is_list(Ranges) -> + send_ranges_multipart(Req, Type, Len, Att, Ranges); + _ -> + Headers1 = Headers ++ + if Enc =:= identity orelse ReqAcceptsAttEnc =:= true -> + [{"Content-MD5", base64:encode(Att#att.md5)}]; + true -> + [] + end, + {ok, Resp} = start_response_length(Req, 200, Headers1, Len), + AttFun(Att, fun(Seg, _) -> send(Resp, Seg) end, {ok, Resp}) + end end end ) @@ -982,9 +1078,7 @@ db_attachment_req(#httpd{method=Method,mochi_req=MochiReq}=Req, Db, DocId, FileN end, - fun() -> couch_httpd:recv(Req, 0) end; - Length -> - exit({length_not_integer, Length}) + fun(Size) -> couch_httpd:recv(Req, Size) end end, att_len = case couch_httpd:header_value(Req,"Content-Length") of undefined -> @@ -1049,6 +1143,25 @@ db_attachment_req(#httpd{method=Method,mochi_req=MochiReq}=Req, Db, DocId, FileN db_attachment_req(Req, _Db, _DocId, _FileNameParts) -> send_method_not_allowed(Req, "DELETE,GET,HEAD,PUT"). +parse_ranges(undefined, _Len) -> + undefined; +parse_ranges(fail, _Len) -> + undefined; +parse_ranges(Ranges, Len) -> + parse_ranges(Ranges, Len, []). + +parse_ranges([], _Len, Acc) -> + lists:reverse(Acc); +parse_ranges([{From, To}|_], _Len, _Acc) when is_integer(From) andalso is_integer(To) andalso To < From -> + throw(requested_range_not_satisfiable); +parse_ranges([{From, To}|Rest], Len, Acc) when is_integer(To) andalso To >= Len -> + parse_ranges([{From, Len-1}] ++ Rest, Len, Acc); +parse_ranges([{none, To}|Rest], Len, Acc) -> + parse_ranges([{Len - To, Len - 1}] ++ Rest, Len, Acc); +parse_ranges([{From, none}|Rest], Len, Acc) -> + parse_ranges([{From, Len - 1}] ++ Rest, Len, Acc); +parse_ranges([{From,To}|Rest], Len, Acc) -> + parse_ranges(Rest, Len, [{From, To}] ++ Acc). get_md5_header(Req) -> ContentMD5 = couch_httpd:header_value(Req, "Content-MD5"), @@ -1137,6 +1250,8 @@ parse_changes_query(Req) -> Args#changes_args{timeout=list_to_integer(Value)}; {"include_docs", "true"} -> Args#changes_args{include_docs=true}; + {"conflicts", "true"} -> + Args#changes_args{conflicts=true}; {"filter", _} -> Args#changes_args{filter=Value}; _Else -> % unknown key value pair, ignore. @@ -1162,15 +1277,19 @@ extract_header_rev(Req, ExplicitRev) -> parse_copy_destination_header(Req) -> - Destination = couch_httpd:header_value(Req, "Destination"), - case re:run(Destination, "\\?", [{capture, none}]) of - nomatch -> - {list_to_binary(Destination), {0, []}}; - match -> - [DocId, RevQs] = re:split(Destination, "\\?", [{return, list}]), - [_RevQueryKey, Rev] = re:split(RevQs, "=", [{return, list}]), - {Pos, RevId} = couch_doc:parse_rev(Rev), - {list_to_binary(DocId), {Pos, [RevId]}} + case couch_httpd:header_value(Req, "Destination") of + undefined -> + throw({bad_request, "Destination header in mandatory for COPY."}); + Destination -> + case re:run(Destination, "\\?", [{capture, none}]) of + nomatch -> + {list_to_binary(Destination), {0, []}}; + match -> + [DocId, RevQs] = re:split(Destination, "\\?", [{return, list}]), + [_RevQueryKey, Rev] = re:split(RevQs, "=", [{return, list}]), + {Pos, RevId} = couch_doc:parse_rev(Rev), + {list_to_binary(DocId), {Pos, [RevId]}} + end end. validate_attachment_names(Doc) -> @@ -1183,34 +1302,8 @@ validate_attachment_name(Name) when is_list(Name) -> validate_attachment_name(<<"_",_/binary>>) -> throw({bad_request, <<"Attachment name can't start with '_'">>}); validate_attachment_name(Name) -> - case is_valid_utf8(Name) of + case couch_util:validate_utf8(Name) of true -> Name; false -> throw({bad_request, <<"Attachment name is not UTF-8 encoded">>}) end. -%% borrowed from mochijson2:json_bin_is_safe() -is_valid_utf8(<<>>) -> - true; -is_valid_utf8(<<C, Rest/binary>>) -> - case C of - $\" -> - false; - $\\ -> - false; - $\b -> - false; - $\f -> - false; - $\n -> - false; - $\r -> - false; - $\t -> - false; - C when C >= 0, C < $\s; C >= 16#7f, C =< 16#10FFFF -> - false; - C when C < 16#7f -> - is_valid_utf8(Rest); - _ -> - false - end. diff --git a/apps/couch/src/couch_httpd_external.erl b/apps/couch/src/couch_httpd_external.erl index 07202934..2e91fb50 100644 --- a/apps/couch/src/couch_httpd_external.erl +++ b/apps/couch/src/couch_httpd_external.erl @@ -56,6 +56,7 @@ process_external_req(HttpReq, Db, Name) -> json_req_obj(Req, Db) -> json_req_obj(Req, Db, null). json_req_obj(#httpd{mochi_req=Req, method=Method, + requested_path_parts=RequestedPath, path_parts=Path, req_body=ReqBody }, Db, DocId) -> @@ -65,18 +66,23 @@ json_req_obj(#httpd{mochi_req=Req, end, ParsedForm = case Req:get_primary_header_value("content-type") of "application/x-www-form-urlencoded" ++ _ -> - mochiweb_util:parse_qs(Body); + case Body of + undefined -> []; + _ -> mochiweb_util:parse_qs(Body) + end; _ -> [] end, Headers = Req:get(headers), Hlist = mochiweb_headers:to_list(Headers), {ok, Info} = couch_db:get_db_info(Db), - % add headers... + +% add headers... {[{<<"info">>, {Info}}, {<<"id">>, DocId}, {<<"uuid">>, couch_uuids:new()}, {<<"method">>, Method}, + {<<"requested_path">>, RequestedPath}, {<<"path">>, Path}, {<<"query">>, json_query_keys(to_json_terms(Req:parse_qs()))}, {<<"headers">>, to_json_terms(Hlist)}, @@ -84,7 +90,8 @@ json_req_obj(#httpd{mochi_req=Req, {<<"peer">>, ?l2b(Req:get(peer))}, {<<"form">>, to_json_terms(ParsedForm)}, {<<"cookie">>, to_json_terms(Req:parse_cookie())}, - {<<"userCtx">>, couch_util:json_user_ctx(Db)}]}. + {<<"userCtx">>, couch_util:json_user_ctx(Db)}, + {<<"secObj">>, couch_db:get_security(Db)}]}. to_json_terms(Data) -> to_json_terms(Data, []). diff --git a/apps/couch/src/couch_httpd_misc_handlers.erl b/apps/couch/src/couch_httpd_misc_handlers.erl index 7a149d11..15f0cad3 100644 --- a/apps/couch/src/couch_httpd_misc_handlers.erl +++ b/apps/couch/src/couch_httpd_misc_handlers.erl @@ -162,15 +162,6 @@ handle_config_req(#httpd{method='GET', path_parts=[_,Section]}=Req) -> KVs = [{list_to_binary(Key), list_to_binary(Value)} || {Key, Value} <- couch_config:get(Section)], send_json(Req, 200, {KVs}); -% PUT /_config/Section/Key -% "value" -handle_config_req(#httpd{method='PUT', path_parts=[_, Section, Key]}=Req) -> - ok = couch_httpd:verify_is_server_admin(Req), - Value = couch_httpd:json_body(Req), - Persist = couch_httpd:header_value(Req, "X-Couch-Persist") /= "false", - OldValue = couch_config:get(Section, Key, ""), - ok = couch_config:set(Section, Key, ?b2l(Value), Persist), - send_json(Req, 200, list_to_binary(OldValue)); % GET /_config/Section/Key handle_config_req(#httpd{method='GET', path_parts=[_, Section, Key]}=Req) -> ok = couch_httpd:verify_is_server_admin(Req), @@ -180,19 +171,86 @@ handle_config_req(#httpd{method='GET', path_parts=[_, Section, Key]}=Req) -> Value -> send_json(Req, 200, list_to_binary(Value)) end; -% DELETE /_config/Section/Key -handle_config_req(#httpd{method='DELETE',path_parts=[_,Section,Key]}=Req) -> +% PUT or DELETE /_config/Section/Key +handle_config_req(#httpd{method=Method, path_parts=[_, Section, Key]}=Req) + when (Method == 'PUT') or (Method == 'DELETE') -> ok = couch_httpd:verify_is_server_admin(Req), Persist = couch_httpd:header_value(Req, "X-Couch-Persist") /= "false", + case couch_config:get(<<"httpd">>, <<"config_whitelist">>, null) of + null -> + % No whitelist; allow all changes. + handle_approved_config_req(Req, Persist); + WhitelistValue -> + % Provide a failsafe to protect against inadvertently locking + % onesself out of the config by supplying a syntactically-incorrect + % Erlang term. To intentionally lock down the whitelist, supply a + % well-formed list which does not include the whitelist config + % variable itself. + FallbackWhitelist = [{<<"httpd">>, <<"config_whitelist">>}], + + Whitelist = case couch_util:parse_term(WhitelistValue) of + {ok, Value} when is_list(Value) -> + Value; + {ok, _NonListValue} -> + FallbackWhitelist; + {error, _} -> + [{WhitelistSection, WhitelistKey}] = FallbackWhitelist, + ?LOG_ERROR("Only whitelisting ~s/~s due to error parsing: ~p", + [WhitelistSection, WhitelistKey, WhitelistValue]), + FallbackWhitelist + end, + + IsRequestedKeyVal = fun(Element) -> + case Element of + {A, B} -> + % For readability, tuples may be used instead of binaries + % in the whitelist. + case {couch_util:to_binary(A), couch_util:to_binary(B)} of + {Section, Key} -> + true; + {Section, <<"*">>} -> + true; + _Else -> + false + end; + _Else -> + false + end + end, + + case lists:any(IsRequestedKeyVal, Whitelist) of + true -> + % Allow modifying this whitelisted variable. + handle_approved_config_req(Req, Persist); + _NotWhitelisted -> + % Disallow modifying this non-whitelisted variable. + send_error(Req, 400, <<"modification_not_allowed">>, + ?l2b("This config variable is read-only")) + end + end; +handle_config_req(Req) -> + send_method_not_allowed(Req, "GET,PUT,DELETE"). + +% PUT /_config/Section/Key +% "value" +handle_approved_config_req(#httpd{method='PUT', path_parts=[_, Section, Key]}=Req, Persist) -> + Value = couch_httpd:json_body(Req), + OldValue = couch_config:get(Section, Key, ""), + case couch_config:set(Section, Key, ?b2l(Value), Persist) of + ok -> + send_json(Req, 200, list_to_binary(OldValue)); + Error -> + throw(Error) + end; +% DELETE /_config/Section/Key +handle_approved_config_req(#httpd{method='DELETE',path_parts=[_,Section,Key]}=Req, Persist) -> case couch_config:get(Section, Key, null) of null -> throw({not_found, unknown_config_value}); OldValue -> couch_config:delete(Section, Key, Persist), send_json(Req, 200, list_to_binary(OldValue)) - end; -handle_config_req(Req) -> - send_method_not_allowed(Req, "GET,PUT,DELETE"). + end. % httpd db handlers diff --git a/apps/couch/src/couch_httpd_proxy.erl b/apps/couch/src/couch_httpd_proxy.erl new file mode 100644 index 00000000..c196f72d --- /dev/null +++ b/apps/couch/src/couch_httpd_proxy.erl @@ -0,0 +1,431 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. +-module(couch_httpd_proxy). + +-export([handle_proxy_req/2]). + +-include("couch_db.hrl"). +-include_lib("ibrowse/include/ibrowse.hrl"). + +-define(TIMEOUT, infinity). +-define(PKT_SIZE, 4096). + + +handle_proxy_req(Req, ProxyDest) -> + + %% Bug in Mochiweb? + %% Reported here: http://github.com/mochi/mochiweb/issues/issue/16 + erase(mochiweb_request_body_length), + + Method = get_method(Req), + Url = get_url(Req, ProxyDest), + Version = get_version(Req), + Headers = get_headers(Req), + Body = get_body(Req), + Options = [ + {http_vsn, Version}, + {headers_as_is, true}, + {response_format, binary}, + {stream_to, {self(), once}} + ], + case ibrowse:send_req(Url, Headers, Method, Body, Options, ?TIMEOUT) of + {ibrowse_req_id, ReqId} -> + stream_response(Req, ProxyDest, ReqId); + {error, Reason} -> + throw({error, Reason}) + end. + + +get_method(#httpd{mochi_req=MochiReq}) -> + case MochiReq:get(method) of + Method when is_atom(Method) -> + list_to_atom(string:to_lower(atom_to_list(Method))); + Method when is_list(Method) -> + list_to_atom(string:to_lower(Method)); + Method when is_binary(Method) -> + list_to_atom(string:to_lower(?b2l(Method))) + end. + + +get_url(Req, ProxyDest) when is_binary(ProxyDest) -> + get_url(Req, ?b2l(ProxyDest)); +get_url(#httpd{mochi_req=MochiReq}=Req, ProxyDest) -> + BaseUrl = case mochiweb_util:partition(ProxyDest, "/") of + {[], "/", _} -> couch_httpd:absolute_uri(Req, ProxyDest); + _ -> ProxyDest + end, + ProxyPrefix = "/" ++ ?b2l(hd(Req#httpd.path_parts)), + RequestedPath = MochiReq:get(raw_path), + case mochiweb_util:partition(RequestedPath, ProxyPrefix) of + {[], ProxyPrefix, []} -> + BaseUrl; + {[], ProxyPrefix, [$/ | DestPath]} -> + remove_trailing_slash(BaseUrl) ++ "/" ++ DestPath; + {[], ProxyPrefix, DestPath} -> + remove_trailing_slash(BaseUrl) ++ "/" ++ DestPath; + _Else -> + throw({invalid_url_path, {ProxyPrefix, RequestedPath}}) + end. + +get_version(#httpd{mochi_req=MochiReq}) -> + MochiReq:get(version). + + +get_headers(#httpd{mochi_req=MochiReq}) -> + to_ibrowse_headers(mochiweb_headers:to_list(MochiReq:get(headers)), []). + +to_ibrowse_headers([], Acc) -> + lists:reverse(Acc); +to_ibrowse_headers([{K, V} | Rest], Acc) when is_atom(K) -> + to_ibrowse_headers([{atom_to_list(K), V} | Rest], Acc); +to_ibrowse_headers([{K, V} | Rest], Acc) when is_list(K) -> + case string:to_lower(K) of + "content-length" -> + to_ibrowse_headers(Rest, [{content_length, V} | Acc]); + % This appears to make ibrowse too smart. + %"transfer-encoding" -> + % to_ibrowse_headers(Rest, [{transfer_encoding, V} | Acc]); + _ -> + to_ibrowse_headers(Rest, [{K, V} | Acc]) + end. + +get_body(#httpd{method='GET'}) -> + fun() -> eof end; +get_body(#httpd{method='HEAD'}) -> + fun() -> eof end; +get_body(#httpd{method='DELETE'}) -> + fun() -> eof end; +get_body(#httpd{mochi_req=MochiReq}) -> + case MochiReq:get(body_length) of + undefined -> + <<>>; + {unknown_transfer_encoding, Unknown} -> + exit({unknown_transfer_encoding, Unknown}); + chunked -> + {fun stream_chunked_body/1, {init, MochiReq, 0}}; + 0 -> + <<>>; + Length when is_integer(Length) andalso Length > 0 -> + {fun stream_length_body/1, {init, MochiReq, Length}}; + Length -> + exit({invalid_body_length, Length}) + end. + + +remove_trailing_slash(Url) -> + rem_slash(lists:reverse(Url)). + +rem_slash([]) -> + []; +rem_slash([$\s | RevUrl]) -> + rem_slash(RevUrl); +rem_slash([$\t | RevUrl]) -> + rem_slash(RevUrl); +rem_slash([$\r | RevUrl]) -> + rem_slash(RevUrl); +rem_slash([$\n | RevUrl]) -> + rem_slash(RevUrl); +rem_slash([$/ | RevUrl]) -> + rem_slash(RevUrl); +rem_slash(RevUrl) -> + lists:reverse(RevUrl). + + +stream_chunked_body({init, MReq, 0}) -> + % First chunk, do expect-continue dance. + init_body_stream(MReq), + stream_chunked_body({stream, MReq, 0, [], ?PKT_SIZE}); +stream_chunked_body({stream, MReq, 0, Buf, BRem}) -> + % Finished a chunk, get next length. If next length + % is 0, its time to try and read trailers. + {CRem, Data} = read_chunk_length(MReq), + case CRem of + 0 -> + BodyData = lists:reverse(Buf, Data), + {ok, BodyData, {trailers, MReq, [], ?PKT_SIZE}}; + _ -> + stream_chunked_body( + {stream, MReq, CRem, [Data | Buf], BRem-size(Data)} + ) + end; +stream_chunked_body({stream, MReq, CRem, Buf, BRem}) when BRem =< 0 -> + % Time to empty our buffers to the upstream socket. + BodyData = lists:reverse(Buf), + {ok, BodyData, {stream, MReq, CRem, [], ?PKT_SIZE}}; +stream_chunked_body({stream, MReq, CRem, Buf, BRem}) -> + % Buffer some more data from the client. + Length = lists:min([CRem, BRem]), + Socket = MReq:get(socket), + NewState = case mochiweb_socket:recv(Socket, Length, ?TIMEOUT) of + {ok, Data} when size(Data) == CRem -> + case mochiweb_socket:recv(Socket, 2, ?TIMEOUT) of + {ok, <<"\r\n">>} -> + {stream, MReq, 0, [<<"\r\n">>, Data | Buf], BRem-Length-2}; + _ -> + exit(normal) + end; + {ok, Data} -> + {stream, MReq, CRem-Length, [Data | Buf], BRem-Length}; + _ -> + exit(normal) + end, + stream_chunked_body(NewState); +stream_chunked_body({trailers, MReq, Buf, BRem}) when BRem =< 0 -> + % Empty our buffers and send data upstream. + BodyData = lists:reverse(Buf), + {ok, BodyData, {trailers, MReq, [], ?PKT_SIZE}}; +stream_chunked_body({trailers, MReq, Buf, BRem}) -> + % Read another trailer into the buffer or stop on an + % empty line. + Socket = MReq:get(socket), + mochiweb_socket:setopts(Socket, [{packet, line}]), + case mochiweb_socket:recv(Socket, 0, ?TIMEOUT) of + {ok, <<"\r\n">>} -> + mochiweb_socket:setopts(Socket, [{packet, raw}]), + BodyData = lists:reverse(Buf, <<"\r\n">>), + {ok, BodyData, eof}; + {ok, Footer} -> + mochiweb_socket:setopts(Socket, [{packet, raw}]), + NewState = {trailers, MReq, [Footer | Buf], BRem-size(Footer)}, + stream_chunked_body(NewState); + _ -> + exit(normal) + end; +stream_chunked_body(eof) -> + % Tell ibrowse we're done sending data. + eof. + + +stream_length_body({init, MochiReq, Length}) -> + % Do the expect-continue dance + init_body_stream(MochiReq), + stream_length_body({stream, MochiReq, Length}); +stream_length_body({stream, _MochiReq, 0}) -> + % Finished streaming. + eof; +stream_length_body({stream, MochiReq, Length}) -> + BufLen = lists:min([Length, ?PKT_SIZE]), + case MochiReq:recv(BufLen) of + <<>> -> eof; + Bin -> {ok, Bin, {stream, MochiReq, Length-BufLen}} + end. + + +init_body_stream(MochiReq) -> + Expect = case MochiReq:get_header_value("expect") of + undefined -> + undefined; + Value when is_list(Value) -> + string:to_lower(Value) + end, + case Expect of + "100-continue" -> + MochiReq:start_raw_response({100, gb_trees:empty()}); + _Else -> + ok + end. + + +read_chunk_length(MochiReq) -> + Socket = MochiReq:get(socket), + mochiweb_socket:setopts(Socket, [{packet, line}]), + case mochiweb_socket:recv(Socket, 0, ?TIMEOUT) of + {ok, Header} -> + mochiweb_socket:setopts(Socket, [{packet, raw}]), + Splitter = fun(C) -> + C =/= $\r andalso C =/= $\n andalso C =/= $\s + end, + {Hex, _Rest} = lists:splitwith(Splitter, ?b2l(Header)), + {mochihex:to_int(Hex), Header}; + _ -> + exit(normal) + end. + + +stream_response(Req, ProxyDest, ReqId) -> + receive + {ibrowse_async_headers, ReqId, "100", _} -> + % ibrowse doesn't handle 100 Continue responses which + % means we have to discard them so the proxy client + % doesn't get confused. + ibrowse:stream_next(ReqId), + stream_response(Req, ProxyDest, ReqId); + {ibrowse_async_headers, ReqId, Status, Headers} -> + {Source, Dest} = get_urls(Req, ProxyDest), + FixedHeaders = fix_headers(Source, Dest, Headers, []), + case body_length(FixedHeaders) of + chunked -> + {ok, Resp} = couch_httpd:start_chunked_response( + Req, list_to_integer(Status), FixedHeaders + ), + ibrowse:stream_next(ReqId), + stream_chunked_response(Req, ReqId, Resp), + {ok, Resp}; + Length when is_integer(Length) -> + {ok, Resp} = couch_httpd:start_response_length( + Req, list_to_integer(Status), FixedHeaders, Length + ), + ibrowse:stream_next(ReqId), + stream_length_response(Req, ReqId, Resp), + {ok, Resp}; + _ -> + {ok, Resp} = couch_httpd:start_response( + Req, list_to_integer(Status), FixedHeaders + ), + ibrowse:stream_next(ReqId), + stream_length_response(Req, ReqId, Resp), + % XXX: MochiWeb apparently doesn't look at the + % response to see if it must force close the + % connection. So we help it out here. + erlang:put(mochiweb_request_force_close, true), + {ok, Resp} + end + end. + + +stream_chunked_response(Req, ReqId, Resp) -> + receive + {ibrowse_async_response, ReqId, {error, Reason}} -> + throw({error, Reason}); + {ibrowse_async_response, ReqId, Chunk} -> + couch_httpd:send_chunk(Resp, Chunk), + ibrowse:stream_next(ReqId), + stream_chunked_response(Req, ReqId, Resp); + {ibrowse_async_response_end, ReqId} -> + couch_httpd:last_chunk(Resp) + end. + + +stream_length_response(Req, ReqId, Resp) -> + receive + {ibrowse_async_response, ReqId, {error, Reason}} -> + throw({error, Reason}); + {ibrowse_async_response, ReqId, Chunk} -> + couch_httpd:send(Resp, Chunk), + ibrowse:stream_next(ReqId), + stream_length_response(Req, ReqId, Resp); + {ibrowse_async_response_end, ReqId} -> + ok + end. + + +get_urls(Req, ProxyDest) -> + SourceUrl = couch_httpd:absolute_uri(Req, "/" ++ hd(Req#httpd.path_parts)), + Source = parse_url(?b2l(iolist_to_binary(SourceUrl))), + case (catch parse_url(ProxyDest)) of + Dest when is_record(Dest, url) -> + {Source, Dest}; + _ -> + DestUrl = couch_httpd:absolute_uri(Req, ProxyDest), + {Source, parse_url(DestUrl)} + end. + + +fix_headers(_, _, [], Acc) -> + lists:reverse(Acc); +fix_headers(Source, Dest, [{K, V} | Rest], Acc) -> + Fixed = case string:to_lower(K) of + "location" -> rewrite_location(Source, Dest, V); + "content-location" -> rewrite_location(Source, Dest, V); + "uri" -> rewrite_location(Source, Dest, V); + "destination" -> rewrite_location(Source, Dest, V); + "set-cookie" -> rewrite_cookie(Source, Dest, V); + _ -> V + end, + fix_headers(Source, Dest, Rest, [{K, Fixed} | Acc]). + + +rewrite_location(Source, #url{host=Host, port=Port, protocol=Proto}, Url) -> + case (catch parse_url(Url)) of + #url{host=Host, port=Port, protocol=Proto} = Location -> + DestLoc = #url{ + protocol=Source#url.protocol, + host=Source#url.host, + port=Source#url.port, + path=join_url_path(Source#url.path, Location#url.path) + }, + url_to_url(DestLoc); + #url{} -> + Url; + _ -> + url_to_url(Source#url{path=join_url_path(Source#url.path, Url)}) + end. + + +rewrite_cookie(_Source, _Dest, Cookie) -> + Cookie. + + +parse_url(Url) when is_binary(Url) -> + ibrowse_lib:parse_url(?b2l(Url)); +parse_url(Url) when is_list(Url) -> + ibrowse_lib:parse_url(?b2l(iolist_to_binary(Url))). + + +join_url_path(Src, Dst) -> + Src2 = case lists:reverse(Src) of + "/" ++ RestSrc -> lists:reverse(RestSrc); + _ -> Src + end, + Dst2 = case Dst of + "/" ++ RestDst -> RestDst; + _ -> Dst + end, + Src2 ++ "/" ++ Dst2. + + +url_to_url(#url{host=Host, port=Port, path=Path, protocol=Proto} = Url) -> + LPort = case {Proto, Port} of + {http, 80} -> ""; + {https, 443} -> ""; + _ -> ":" ++ integer_to_list(Port) + end, + LPath = case Path of + "/" ++ _RestPath -> Path; + _ -> "/" ++ Path + end, + HostPart = case Url#url.host_type of + ipv6_address -> + "[" ++ Host ++ "]"; + _ -> + Host + end, + atom_to_list(Proto) ++ "://" ++ HostPart ++ LPort ++ LPath. + + +body_length(Headers) -> + case is_chunked(Headers) of + true -> chunked; + _ -> content_length(Headers) + end. + + +is_chunked([]) -> + false; +is_chunked([{K, V} | Rest]) -> + case string:to_lower(K) of + "transfer-encoding" -> + string:to_lower(V) == "chunked"; + _ -> + is_chunked(Rest) + end. + +content_length([]) -> + undefined; +content_length([{K, V} | Rest]) -> + case string:to_lower(K) of + "content-length" -> + list_to_integer(V); + _ -> + content_length(Rest) + end. + diff --git a/apps/couch/src/couch_httpd_rewrite.erl b/apps/couch/src/couch_httpd_rewrite.erl index 6c3d0e3c..8480c1e9 100644 --- a/apps/couch/src/couch_httpd_rewrite.erl +++ b/apps/couch/src/couch_httpd_rewrite.erl @@ -117,8 +117,7 @@ handle_rewrite_req(#httpd{ % we are in a design handler DesignId = <<"_design/", DesignName/binary>>, Prefix = <<"/", DbName/binary, "/", DesignId/binary>>, - QueryList = couch_httpd:qs(Req), - QueryList1 = [{to_binding(K), V} || {K, V} <- QueryList], + QueryList = lists:map(fun decode_query_value/1, couch_httpd:qs(Req)), #doc{body={Props}} = DDoc, @@ -133,10 +132,11 @@ handle_rewrite_req(#httpd{ Rules -> % create dispatch list from rules DispatchList = [make_rule(Rule) || {Rule} <- Rules], + Method1 = couch_util:to_binary(Method), %% get raw path by matching url to a rule. - RawPath = case try_bind_path(DispatchList, couch_util:to_binary(Method), PathParts, - QueryList1) of + RawPath = case try_bind_path(DispatchList, Method1, + PathParts, QueryList) of no_dispatch_path -> throw(not_found); {NewPathParts, Bindings} -> @@ -144,12 +144,13 @@ handle_rewrite_req(#httpd{ % build new path, reencode query args, eventually convert % them to json - Path = lists:append( - string:join(Parts, [?SEPARATOR]), - case Bindings of - [] -> []; - _ -> [$?, encode_query(Bindings)] - end), + Bindings1 = maybe_encode_bindings(Bindings), + Path = binary_to_list( + iolist_to_binary([ + string:join(Parts, [?SEPARATOR]), + [["?", mochiweb_util:urlencode(Bindings1)] + || Bindings1 =/= [] ] + ])), % if path is relative detect it and rewrite path case mochiweb_util:safe_relative_path(Path) of @@ -196,7 +197,7 @@ quote_plus(X) -> try_bind_path([], _Method, _PathParts, _QueryList) -> no_dispatch_path; try_bind_path([Dispatch|Rest], Method, PathParts, QueryList) -> - [{PathParts1, Method1}, RedirectPath, QueryArgs] = Dispatch, + [{PathParts1, Method1}, RedirectPath, QueryArgs, Formats] = Dispatch, case bind_method(Method1, Method) of true -> case bind_path(PathParts1, PathParts, []) of @@ -204,7 +205,8 @@ try_bind_path([Dispatch|Rest], Method, PathParts, QueryList) -> Bindings1 = Bindings ++ QueryList, % we parse query args from the rule and fill % it eventually with bindings vars - QueryArgs1 = make_query_list(QueryArgs, Bindings1, []), + QueryArgs1 = make_query_list(QueryArgs, Bindings1, + Formats, []), % remove params in QueryLists1 that are already in % QueryArgs1 Bindings2 = lists:foldl(fun({K, V}, Acc) -> @@ -230,56 +232,79 @@ try_bind_path([Dispatch|Rest], Method, PathParts, QueryList) -> %% rewriting dynamically the quey list given as query member in %% rewrites. Each value is replaced by one binding or an argument %% passed in url. -make_query_list([], _Bindings, Acc) -> +make_query_list([], _Bindings, _Formats, Acc) -> Acc; -make_query_list([{Key, {Value}}|Rest], Bindings, Acc) -> - Value1 = to_json({Value}), - make_query_list(Rest, Bindings, [{to_binding(Key), Value1}|Acc]); -make_query_list([{Key, Value}|Rest], Bindings, Acc) when is_binary(Value) -> - Value1 = replace_var(Key, Value, Bindings), - make_query_list(Rest, Bindings, [{to_binding(Key), Value1}|Acc]); -make_query_list([{Key, Value}|Rest], Bindings, Acc) when is_list(Value) -> - Value1 = replace_var(Key, Value, Bindings), - make_query_list(Rest, Bindings, [{to_binding(Key), Value1}|Acc]); -make_query_list([{Key, Value}|Rest], Bindings, Acc) -> - make_query_list(Rest, Bindings, [{to_binding(Key), Value}|Acc]). - -replace_var(Key, Value, Bindings) -> - case Value of - <<":", Var/binary>> -> - get_var(Var, Bindings, Value); - _ when is_list(Value) -> - Value1 = lists:foldr(fun(V, Acc) -> - V1 = case V of - <<":", VName/binary>> -> - case get_var(VName, Bindings, V) of - V2 when is_list(V2) -> - iolist_to_binary(V2); - V2 -> V2 - end; - _ -> - - V - end, - [V1|Acc] - end, [], Value), - to_json(Value1); - _ when is_binary(Value) -> - Value; - _ -> - case Key of - <<"key">> -> to_json(Value); - <<"startkey">> -> to_json(Value); - <<"endkey">> -> to_json(Value); - _ -> - lists:flatten(?JSON_ENCODE(Value)) - end +make_query_list([{Key, {Value}}|Rest], Bindings, Formats, Acc) -> + Value1 = {Value}, + make_query_list(Rest, Bindings, Formats, [{to_binding(Key), Value1}|Acc]); +make_query_list([{Key, Value}|Rest], Bindings, Formats, Acc) when is_binary(Value) -> + Value1 = replace_var(Value, Bindings, Formats), + make_query_list(Rest, Bindings, Formats, [{to_binding(Key), Value1}|Acc]); +make_query_list([{Key, Value}|Rest], Bindings, Formats, Acc) when is_list(Value) -> + Value1 = replace_var(Value, Bindings, Formats), + make_query_list(Rest, Bindings, Formats, [{to_binding(Key), Value1}|Acc]); +make_query_list([{Key, Value}|Rest], Bindings, Formats, Acc) -> + make_query_list(Rest, Bindings, Formats, [{to_binding(Key), Value}|Acc]). + +replace_var(<<"*">>=Value, Bindings, Formats) -> + get_var(Value, Bindings, Value, Formats); +replace_var(<<":", Var/binary>> = Value, Bindings, Formats) -> + get_var(Var, Bindings, Value, Formats); +replace_var(Value, _Bindings, _Formats) when is_binary(Value) -> + Value; +replace_var(Value, Bindings, Formats) when is_list(Value) -> + lists:reverse(lists:foldl(fun + (<<":", Var/binary>>=Value1, Acc) -> + [get_var(Var, Bindings, Value1, Formats)|Acc]; + (Value1, Acc) -> + [Value1|Acc] + end, [], Value)); +replace_var(Value, _Bindings, _Formats) -> + Value. + +maybe_json(Key, Value) -> + case lists:member(Key, [<<"key">>, <<"startkey">>, <<"start_key">>, + <<"endkey">>, <<"end_key">>, <<"keys">>]) of + true -> + ?JSON_ENCODE(Value); + false -> + Value end. - -get_var(VarName, Props, Default) -> +get_var(VarName, Props, Default, Formats) -> VarName1 = to_binding(VarName), - couch_util:get_value(VarName1, Props, Default). + Val = couch_util:get_value(VarName1, Props, Default), + maybe_format(VarName, Val, Formats). + +maybe_format(VarName, Value, Formats) -> + case couch_util:get_value(VarName, Formats) of + undefined -> + Value; + Format -> + format(Format, Value) + end. + +format(<<"int">>, Value) when is_integer(Value) -> + Value; +format(<<"int">>, Value) when is_binary(Value) -> + format(<<"int">>, ?b2l(Value)); +format(<<"int">>, Value) when is_list(Value) -> + case (catch list_to_integer(Value)) of + IntVal when is_integer(IntVal) -> + IntVal; + _ -> + Value + end; +format(<<"bool">>, Value) when is_binary(Value) -> + format(<<"bool">>, ?b2l(Value)); +format(<<"bool">>, Value) when is_list(Value) -> + case string:to_lower(Value) of + "true" -> true; + "false" -> false; + _ -> Value + end; +format(_Format, Value) -> + Value. %% doc: build new patch from bindings. bindings are query args %% (+ dynamic query rewritten if needed) and bindings found in @@ -295,7 +320,8 @@ make_new_path([?MATCH_ALL|_Rest], _Bindings, Remaining, Acc) -> make_new_path([{bind, P}|Rest], Bindings, Remaining, Acc) -> P2 = case couch_util:get_value({bind, P}, Bindings) of undefined -> << "undefined">>; - P1 -> P1 + P1 -> + iolist_to_binary(P1) end, make_new_path(Rest, Bindings, Remaining, [P2|Acc]); make_new_path([P|Rest], Bindings, Remaining, Acc) -> @@ -306,7 +332,7 @@ make_new_path([P|Rest], Bindings, Remaining, Acc) -> %% method rule is '*', which is the default, all %% request method will bind. It allows us to make rules %% depending on HTTP method. -bind_method(?MATCH_ALL, _Method) -> +bind_method(?MATCH_ALL, _Method ) -> true; bind_method({bind, Method}, Method) -> true; @@ -318,8 +344,8 @@ bind_method(_, _) -> %% to the current url by pattern matching bind_path([], [], Bindings) -> {ok, [], Bindings}; -bind_path([?MATCH_ALL], Rest, Bindings) when is_list(Rest) -> - {ok, Rest, Bindings}; +bind_path([?MATCH_ALL], [Match|_RestMatch]=Rest, Bindings) -> + {ok, Rest, [{?MATCH_ALL, Match}|Bindings]}; bind_path(_, [], _) -> fail; bind_path([{bind, Token}|RestToken],[Match|RestMatch],Bindings) -> @@ -372,7 +398,11 @@ make_rule(Rule) -> To -> parse_path(To) end, - [{FromParts, Method}, ToParts, QueryArgs]. + Formats = case couch_util:get_value(<<"formats">>, Rule) of + undefined -> []; + {Fmts} -> Fmts + end, + [{FromParts, Method}, ToParts, QueryArgs, Formats]. parse_path(Path) -> {ok, SlashRE} = re:compile(<<"\\/">>), @@ -405,17 +435,25 @@ path_to_list([P|R], Acc, DotDotCount) -> end, path_to_list(R, [P1|Acc], DotDotCount). -encode_query(Props) -> - Props1 = lists:foldl(fun ({{bind, K}, V}, Acc) -> - V1 = case is_list(V) orelse is_binary(V) of - true -> V; - false -> - % probably it's a number - quote_plus(V) - end, - [{K, V1} | Acc] - end, [], Props), - lists:flatten(mochiweb_util:urlencode(Props1)). +maybe_encode_bindings([]) -> + []; +maybe_encode_bindings(Props) -> + lists:foldl(fun + ({{bind, <<"*">>}, _V}, Acc) -> + Acc; + ({{bind, K}, V}, Acc) -> + V1 = iolist_to_binary(maybe_json(K, V)), + [{K, V1}|Acc] + end, [], Props). + +decode_query_value({K,V}) -> + case lists:member(K, ["key", "startkey", "start_key", + "endkey", "end_key", "keys"]) of + true -> + {to_binding(K), ?JSON_DECODE(V)}; + false -> + {to_binding(K), ?l2b(V)} + end. to_binding({bind, V}) -> {bind, V}; @@ -423,6 +461,3 @@ to_binding(V) when is_list(V) -> to_binding(?l2b(V)); to_binding(V) -> {bind, V}. - -to_json(V) -> - iolist_to_binary(?JSON_ENCODE(V)). diff --git a/apps/couch/src/couch_httpd_show.erl b/apps/couch/src/couch_httpd_show.erl index d50ca83a..59f74e1c 100644 --- a/apps/couch/src/couch_httpd_show.erl +++ b/apps/couch/src/couch_httpd_show.erl @@ -153,12 +153,14 @@ send_doc_update_response(Req, Db, DDoc, UpdateName, Doc, DocId) -> % view-list request with view and list from same design doc. handle_view_list_req(#httpd{method='GET', path_parts=[_, _, DesignName, _, ListName, ViewName]}=Req, Db, DDoc) -> - handle_view_list(Req, Db, DDoc, ListName, {DesignName, ViewName}, nil); + Keys = couch_httpd:qs_json_value(Req, "keys", nil), + handle_view_list(Req, Db, DDoc, ListName, {DesignName, ViewName}, Keys); % view-list request with view and list from different design docs. handle_view_list_req(#httpd{method='GET', path_parts=[_, _, _, _, ListName, ViewDesignName, ViewName]}=Req, Db, DDoc) -> - handle_view_list(Req, Db, DDoc, ListName, {ViewDesignName, ViewName}, nil); + Keys = couch_httpd:qs_json_value(Req, "keys", nil), + handle_view_list(Req, Db, DDoc, ListName, {ViewDesignName, ViewName}, Keys); handle_view_list_req(#httpd{method='GET'}=Req, _Db, _DDoc) -> send_error(Req, 404, <<"list_error">>, <<"Invalid path.">>); @@ -188,14 +190,14 @@ handle_view_list_req(Req, _Db, _DDoc) -> handle_view_list(Req, Db, DDoc, LName, {ViewDesignName, ViewName}, Keys) -> ViewDesignId = <<"_design/", ViewDesignName/binary>>, {ViewType, View, Group, QueryArgs} = couch_httpd_view:load_view(Req, Db, {ViewDesignId, ViewName}, Keys), - Etag = list_etag(Req, Db, Group, {couch_httpd:doc_etag(DDoc), Keys}), + Etag = list_etag(Req, Db, Group, View, {couch_httpd:doc_etag(DDoc), Keys}), couch_httpd:etag_respond(Req, Etag, fun() -> output_list(ViewType, Req, Db, DDoc, LName, View, QueryArgs, Etag, Keys, Group) end). -list_etag(#httpd{user_ctx=UserCtx}=Req, Db, Group, More) -> +list_etag(#httpd{user_ctx=UserCtx}=Req, Db, Group, View, More) -> Accept = couch_httpd:header_value(Req, "Accept"), - couch_httpd_view:view_group_etag(Group, Db, {More, Accept, UserCtx#user_ctx.roles}). + couch_httpd_view:view_etag(Db, Group, View, {More, Accept, UserCtx#user_ctx.roles}). output_list(map, Req, Db, DDoc, LName, View, QueryArgs, Etag, Keys, Group) -> output_map_list(Req, Db, DDoc, LName, View, QueryArgs, Etag, Keys, Group); @@ -307,18 +309,20 @@ start_list_resp(QServer, LName, Req, Db, Head, Etag) -> {ok, Resp, ?b2l(?l2b(Chunks))}. make_map_send_row_fun(QueryServer) -> - fun(Resp, Db, Row, IncludeDocs, RowFront) -> - send_list_row(Resp, QueryServer, Db, Row, RowFront, IncludeDocs) + fun(Resp, Db, Row, IncludeDocs, Conflicts, RowFront) -> + send_list_row( + Resp, QueryServer, Db, Row, RowFront, IncludeDocs, Conflicts) end. make_reduce_send_row_fun(QueryServer, Db) -> fun(Resp, Row, RowFront) -> - send_list_row(Resp, QueryServer, Db, Row, RowFront, false) + send_list_row(Resp, QueryServer, Db, Row, RowFront, false, false) end. -send_list_row(Resp, QueryServer, Db, Row, RowFront, IncludeDoc) -> +send_list_row(Resp, QueryServer, Db, Row, RowFront, IncludeDoc, Conflicts) -> try - [Go,Chunks] = prompt_list_row(QueryServer, Db, Row, IncludeDoc), + [Go,Chunks] = prompt_list_row( + QueryServer, Db, Row, IncludeDoc, Conflicts), Chunk = RowFront ++ ?b2l(?l2b(Chunks)), send_non_empty_chunk(Resp, Chunk), case Go of @@ -334,11 +338,12 @@ send_list_row(Resp, QueryServer, Db, Row, RowFront, IncludeDoc) -> end. -prompt_list_row({Proc, _DDocId}, Db, {{Key, DocId}, Value}, IncludeDoc) -> - JsonRow = couch_httpd_view:view_row_obj(Db, {{Key, DocId}, Value}, IncludeDoc), +prompt_list_row({Proc, _DDocId}, Db, {{_Key, _DocId}, _} = Kv, + IncludeDoc, Conflicts) -> + JsonRow = couch_httpd_view:view_row_obj(Db, Kv, IncludeDoc, Conflicts), couch_query_servers:proc_prompt(Proc, [<<"list_row">>, JsonRow]); -prompt_list_row({Proc, _DDocId}, _, {Key, Value}, _IncludeDoc) -> +prompt_list_row({Proc, _DDocId}, _, {Key, Value}, _IncludeDoc, _Conflicts) -> JsonRow = {[{key, Key}, {value, Value}]}, couch_query_servers:proc_prompt(Proc, [<<"list_row">>, JsonRow]). diff --git a/apps/couch/src/couch_httpd_vhost.erl b/apps/couch/src/couch_httpd_vhost.erl new file mode 100644 index 00000000..9bfb5951 --- /dev/null +++ b/apps/couch/src/couch_httpd_vhost.erl @@ -0,0 +1,403 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + + + +-module(couch_httpd_vhost). +-behaviour(gen_server). + +-export([start_link/0, init/1, handle_call/3, handle_info/2, handle_cast/2]). +-export([code_change/3, terminate/2]). +-export([match_vhost/1, urlsplit_netloc/2]). +-export([redirect_to_vhost/2]). + +-include("couch_db.hrl"). + +-define(SEPARATOR, $\/). +-define(MATCH_ALL, {bind, '*'}). + +-record(vhosts, { + vhost_globals, + vhosts = [], + vhost_fun +}). + + +%% doc the vhost manager. +%% This gen_server keep state of vhosts added to the ini and try to +%% match the Host header (or forwarded) against rules built against +%% vhost list. +%% +%% Declaration of vhosts take place in the configuration file : +%% +%% [vhosts] +%% example.com = /example +%% *.example.com = /example +%% +%% The first line will rewrite the rquest to display the content of the +%% example database. This rule works only if the Host header is +%% 'example.com' and won't work for CNAMEs. Second rule on the other hand +%% match all CNAMES to example db. So www.example.com or db.example.com +%% will work. +%% +%% The wildcard ('*') should always be the last in the cnames: +%% +%% "*.db.example.com = /" will match all cname on top of db +%% examples to the root of the machine. +%% +%% +%% Rewriting Hosts to path +%% ----------------------- +%% +%% Like in the _rewrite handler you could match some variable and use +%them to create the target path. Some examples: +%% +%% [vhosts] +%% *.example.com = /* +%% :dbname.example.com = /:dbname +%% :ddocname.:dbname.example.com = /:dbname/_design/:ddocname/_rewrite +%% +%% First rule pass wildcard as dbname, second do the same but use a +%% variable name and the third one allows you to use any app with +%% @ddocname in any db with @dbname . +%% +%% You could also change the default function to handle request by +%% changing the setting `redirect_vhost_handler` in `httpd` section of +%% the Ini: +%% +%% [httpd] +%% redirect_vhost_handler = {Module, Fun} +%% +%% The function take 2 args : the mochiweb request object and the target +%%% path. + +start_link() -> + gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). + +%% @doc Try to find a rule matching current Host heade. some rule is +%% found it rewrite the Mochiweb Request else it return current Request. +match_vhost(MochiReq) -> + {ok, MochiReq1} = gen_server:call(couch_httpd_vhost, {match_vhost, + MochiReq}), + + MochiReq1. + + +%% -------------------- +%% gen_server functions +%% -------------------- + +init(_) -> + process_flag(trap_exit, true), + + % init state + VHosts = make_vhosts(), + VHostGlobals = re:split( + couch_config:get("httpd", "vhost_global_handlers", ""), + ", ?", + [{return, list}] + ), + + % Set vhost fun + DefaultVHostFun = "{couch_httpd_vhost, redirect_to_vhost}", + VHostFun = couch_httpd:make_arity_2_fun( + couch_config:get("httpd", "redirect_vhost_handler", DefaultVHostFun) + ), + + + Self = self(), + % register for changes in vhosts section + ok = couch_config:register( + fun("vhosts") -> + ok = gen_server:call(Self, vhosts_changed, infinity) + end + ), + + % register for changes in vhost_global_handlers key + ok = couch_config:register( + fun("httpd", "vhost_global_handlers") -> + ok = gen_server:call(Self, vhosts_global_changed, infinity) + end + ), + + ok = couch_config:register( + fun("httpd", "redirect_vhost_handler") -> + ok = gen_server:call(Self, fun_changed, infinity) + end + ), + + {ok, #vhosts{ + vhost_globals = VHostGlobals, + vhosts = VHosts, + vhost_fun = VHostFun} + }. + + +handle_call({match_vhost, MochiReq}, _From, State) -> + #vhosts{ + vhost_globals = VHostGlobals, + vhosts = VHosts, + vhost_fun = Fun + } = State, + + {"/" ++ VPath, Query, Fragment} = mochiweb_util:urlsplit_path(MochiReq:get(raw_path)), + VPathParts = string:tokens(VPath, "/"), + + XHost = couch_config:get("httpd", "x_forwarded_host", "X-Forwarded-Host"), + VHost = case MochiReq:get_header_value(XHost) of + undefined -> + case MochiReq:get_header_value("Host") of + undefined -> []; + Value1 -> Value1 + end; + Value -> Value + end, + {VHostParts, VhostPort} = split_host_port(VHost), + FinalMochiReq = case try_bind_vhost(VHosts, lists:reverse(VHostParts), + VhostPort, VPathParts) of + no_vhost_matched -> MochiReq; + {VhostTarget, NewPath} -> + case vhost_global(VHostGlobals, MochiReq) of + true -> + MochiReq; + _Else -> + NewPath1 = mochiweb_util:urlunsplit_path({NewPath, Query, + Fragment}), + MochiReq1 = mochiweb_request:new(MochiReq:get(socket), + MochiReq:get(method), + NewPath1, + MochiReq:get(version), + MochiReq:get(headers)), + Fun(MochiReq1, VhostTarget) + end + end, + {reply, {ok, FinalMochiReq}, State}; + +% update vhosts +handle_call(vhosts_changed, _From, State) -> + {reply, ok, State#vhosts{vhosts= make_vhosts()}}; + + +% update vhosts_globals +handle_call(vhosts_global_changed, _From, State) -> + VHostGlobals = re:split( + couch_config:get("httpd", "vhost_global_handlers", ""), + ", ?", + [{return, list}] + ), + {reply, ok, State#vhosts{vhost_globals=VHostGlobals}}; +% change fun +handle_call(fun_changed, _From, State) -> + DefaultVHostFun = "{couch_httpd_vhosts, redirect_to_vhost}", + VHostFun = couch_httpd:make_arity_2_fun( + couch_config:get("httpd", "redirect_vhost_handler", DefaultVHostFun) + ), + {reply, ok, State#vhosts{vhost_fun=VHostFun}}. + +handle_cast(_Msg, State) -> + {noreply, State}. + +handle_info(_Msg, State) -> + {noreply, State}. + +terminate(_Reason, _State) -> + ok. + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + + + +% default redirect vhost handler + +redirect_to_vhost(MochiReq, VhostTarget) -> + Path = MochiReq:get(raw_path), + Target = VhostTarget ++ Path, + + ?LOG_DEBUG("Vhost Target: '~p'~n", [Target]), + + Headers = mochiweb_headers:enter("x-couchdb-vhost-path", Path, + MochiReq:get(headers)), + + % build a new mochiweb request + MochiReq1 = mochiweb_request:new(MochiReq:get(socket), + MochiReq:get(method), + Target, + MochiReq:get(version), + Headers), + % cleanup, It force mochiweb to reparse raw uri. + MochiReq1:cleanup(), + + MochiReq1. + +%% if so, then it will not be rewritten, but will run as a normal couchdb request. +%* normally you'd use this for _uuids _utils and a few of the others you want to +%% keep available on vhosts. You can also use it to make databases 'global'. +vhost_global( VhostGlobals, MochiReq) -> + RawUri = MochiReq:get(raw_path), + {"/" ++ Path, _, _} = mochiweb_util:urlsplit_path(RawUri), + + Front = case couch_httpd:partition(Path) of + {"", "", ""} -> + "/"; % Special case the root url handler + {FirstPart, _, _} -> + FirstPart + end, + [true] == [true||V <- VhostGlobals, V == Front]. + +%% bind host +%% first it try to bind the port then the hostname. +try_bind_vhost([], _HostParts, _Port, _PathParts) -> + no_vhost_matched; +try_bind_vhost([VhostSpec|Rest], HostParts, Port, PathParts) -> + {{VHostParts, VPort, VPath}, Path} = VhostSpec, + case bind_port(VPort, Port) of + ok -> + case bind_vhost(lists:reverse(VHostParts), HostParts, []) of + {ok, Bindings, Remainings} -> + case bind_path(VPath, PathParts) of + {ok, PathParts1} -> + Path1 = make_target(Path, Bindings, Remainings, []), + {make_path(Path1), make_path(PathParts1)}; + fail -> + try_bind_vhost(Rest, HostParts, Port, + PathParts) + end; + fail -> try_bind_vhost(Rest, HostParts, Port, PathParts) + end; + fail -> try_bind_vhost(Rest, HostParts, Port, PathParts) + end. + +%% doc: build new patch from bindings. bindings are query args +%% (+ dynamic query rewritten if needed) and bindings found in +%% bind_path step. +%% TODO: merge code wit rewrite. But we need to make sure we are +%% in string here. +make_target([], _Bindings, _Remaining, Acc) -> + lists:reverse(Acc); +make_target([?MATCH_ALL], _Bindings, Remaining, Acc) -> + Acc1 = lists:reverse(Acc) ++ Remaining, + Acc1; +make_target([?MATCH_ALL|_Rest], _Bindings, Remaining, Acc) -> + Acc1 = lists:reverse(Acc) ++ Remaining, + Acc1; +make_target([{bind, P}|Rest], Bindings, Remaining, Acc) -> + P2 = case couch_util:get_value({bind, P}, Bindings) of + undefined -> "undefined"; + P1 -> P1 + end, + make_target(Rest, Bindings, Remaining, [P2|Acc]); +make_target([P|Rest], Bindings, Remaining, Acc) -> + make_target(Rest, Bindings, Remaining, [P|Acc]). + +%% bind port +bind_port(Port, Port) -> ok; +bind_port('*', _) -> ok; +bind_port(_,_) -> fail. + +%% bind bhost +bind_vhost([],[], Bindings) -> {ok, Bindings, []}; +bind_vhost([?MATCH_ALL], [], _Bindings) -> fail; +bind_vhost([?MATCH_ALL], Rest, Bindings) -> {ok, Bindings, Rest}; +bind_vhost([], _HostParts, _Bindings) -> fail; +bind_vhost([{bind, Token}|Rest], [Match|RestHost], Bindings) -> + bind_vhost(Rest, RestHost, [{{bind, Token}, Match}|Bindings]); +bind_vhost([Cname|Rest], [Cname|RestHost], Bindings) -> + bind_vhost(Rest, RestHost, Bindings); +bind_vhost(_, _, _) -> fail. + +%% bind path +bind_path([], PathParts) -> + {ok, PathParts}; +bind_path(_VPathParts, []) -> + fail; +bind_path([Path|VRest],[Path|Rest]) -> + bind_path(VRest, Rest); +bind_path(_, _) -> + fail. + +% utilities + + +%% create vhost list from ini +make_vhosts() -> + Vhosts = lists:foldl(fun({Vhost, Path}, Acc) -> + [{parse_vhost(Vhost), split_path(Path)}|Acc] + end, [], couch_config:get("vhosts")), + lists:reverse(lists:usort(Vhosts)). + +parse_vhost(Vhost) -> + case urlsplit_netloc(Vhost, []) of + {[], Path} -> + {make_spec("*", []), '*', Path}; + {HostPort, []} -> + {H, P} = split_host_port(HostPort), + H1 = make_spec(H, []), + {H1, P, []}; + {HostPort, Path} -> + {H, P} = split_host_port(HostPort), + H1 = make_spec(H, []), + {H1, P, string:tokens(Path, "/")} + end. + + +split_host_port(HostAsString) -> + case string:rchr(HostAsString, $:) of + 0 -> + {split_host(HostAsString), '*'}; + N -> + HostPart = string:substr(HostAsString, 1, N-1), + case (catch erlang:list_to_integer(HostAsString, N+1, + length(HostAsString))) of + {'EXIT', _} -> + {split_host(HostAsString), '*'}; + Port -> + {split_host(HostPart), Port} + end + end. + +split_host(HostAsString) -> + string:tokens(HostAsString, "\."). + +split_path(Path) -> + make_spec(string:tokens(Path, "/"), []). + + +make_spec([], Acc) -> + lists:reverse(Acc); +make_spec([""|R], Acc) -> + make_spec(R, Acc); +make_spec(["*"|R], Acc) -> + make_spec(R, [?MATCH_ALL|Acc]); +make_spec([P|R], Acc) -> + P1 = parse_var(P), + make_spec(R, [P1|Acc]). + + +parse_var(P) -> + case P of + ":" ++ Var -> + {bind, Var}; + _ -> P + end. + + +% mochiweb doesn't export it. +urlsplit_netloc("", Acc) -> + {lists:reverse(Acc), ""}; +urlsplit_netloc(Rest=[C | _], Acc) when C =:= $/; C =:= $?; C =:= $# -> + {lists:reverse(Acc), Rest}; +urlsplit_netloc([C | Rest], Acc) -> + urlsplit_netloc(Rest, [C | Acc]). + +make_path(Parts) -> + "/" ++ string:join(Parts,[?SEPARATOR]). diff --git a/apps/couch/src/couch_httpd_view.erl b/apps/couch/src/couch_httpd_view.erl index cb387d1b..b71fc2c6 100644 --- a/apps/couch/src/couch_httpd_view.erl +++ b/apps/couch/src/couch_httpd_view.erl @@ -15,10 +15,10 @@ -export([handle_view_req/3,handle_temp_view_req/2]). --export([get_stale_type/1, get_reduce_type/1, parse_view_params/3]). --export([make_view_fold_fun/7, finish_view_fold/4, finish_view_fold/5, view_row_obj/3]). --export([view_group_etag/2, view_group_etag/3, make_reduce_fold_funs/6]). --export([design_doc_view/5, parse_bool_param/1, doc_member/2]). +-export([parse_view_params/3]). +-export([make_view_fold_fun/7, finish_view_fold/4, finish_view_fold/5, view_row_obj/4]). +-export([view_etag/3, view_etag/4, make_reduce_fold_funs/6]). +-export([design_doc_view/5, parse_bool_param/1, doc_member/3]). -export([make_key_options/1, load_view/4]). -import(couch_httpd, @@ -57,7 +57,8 @@ design_doc_view(Req, Db, DName, ViewName, Keys) -> handle_view_req(#httpd{method='GET', path_parts=[_, _, DName, _, ViewName]}=Req, Db, _DDoc) -> - design_doc_view(Req, Db, DName, ViewName, nil); + Keys = couch_httpd:qs_json_value(Req, "keys", nil), + design_doc_view(Req, Db, DName, ViewName, Keys); handle_view_req(#httpd{method='POST', path_parts=[_, _, DName, _, ViewName]}=Req, Db, _DDoc) -> @@ -113,7 +114,7 @@ output_map_view(Req, View, Group, Db, QueryArgs, nil) -> limit = Limit, skip = SkipCount } = QueryArgs, - CurrentEtag = view_group_etag(Group, Db), + CurrentEtag = view_etag(Db, Group, View), couch_httpd:etag_respond(Req, CurrentEtag, fun() -> {ok, RowCount} = couch_view:get_row_count(View), FoldlFun = make_view_fold_fun(Req, QueryArgs, CurrentEtag, Db, Group#group.current_seq, RowCount, #view_fold_helper_funs{reduce_count=fun couch_view:reduce_to_count/1}), @@ -129,7 +130,7 @@ output_map_view(Req, View, Group, Db, QueryArgs, Keys) -> limit = Limit, skip = SkipCount } = QueryArgs, - CurrentEtag = view_group_etag(Group, Db, Keys), + CurrentEtag = view_etag(Db, Group, View, Keys), couch_httpd:etag_respond(Req, CurrentEtag, fun() -> {ok, RowCount} = couch_view:get_row_count(View), FoldAccInit = {Limit, SkipCount, undefined, []}, @@ -154,7 +155,7 @@ output_reduce_view(Req, Db, View, Group, QueryArgs, nil) -> skip = Skip, group_level = GroupLevel } = QueryArgs, - CurrentEtag = view_group_etag(Group, Db), + CurrentEtag = view_etag(Db, Group, View), couch_httpd:etag_respond(Req, CurrentEtag, fun() -> {ok, GroupRowsFun, RespFun} = make_reduce_fold_funs(Req, GroupLevel, QueryArgs, CurrentEtag, Group#group.current_seq, @@ -172,7 +173,7 @@ output_reduce_view(Req, Db, View, Group, QueryArgs, Keys) -> skip = Skip, group_level = GroupLevel } = QueryArgs, - CurrentEtag = view_group_etag(Group, Db, Keys), + CurrentEtag = view_etag(Db, Group, View, Keys), couch_httpd:etag_respond(Req, CurrentEtag, fun() -> {ok, GroupRowsFun, RespFun} = make_reduce_fold_funs(Req, GroupLevel, QueryArgs, CurrentEtag, Group#group.current_seq, @@ -246,7 +247,7 @@ parse_view_params(Req, Keys, ViewType) -> QueryArgs = lists:foldl(fun({K, V}, Args2) -> validate_view_query(K, V, Args2) end, Args, lists:reverse(QueryParams)), % Reverse to match QS order. - + warn_on_empty_key_range(QueryArgs), GroupLevel = QueryArgs#view_query_args.group_level, case {ViewType, GroupLevel, IsMultiGet} of {reduce, exact, true} -> @@ -268,22 +269,37 @@ parse_view_param("", _) -> parse_view_param("key", Value) -> JsonKey = ?JSON_DECODE(Value), [{start_key, JsonKey}, {end_key, JsonKey}]; +% TODO: maybe deprecate startkey_docid parse_view_param("startkey_docid", Value) -> [{start_docid, ?l2b(Value)}]; +parse_view_param("start_key_doc_id", Value) -> + [{start_docid, ?l2b(Value)}]; +% TODO: maybe deprecate endkey_docid parse_view_param("endkey_docid", Value) -> [{end_docid, ?l2b(Value)}]; +parse_view_param("end_key_doc_id", Value) -> + [{end_docid, ?l2b(Value)}]; +% TODO: maybe deprecate startkey parse_view_param("startkey", Value) -> [{start_key, ?JSON_DECODE(Value)}]; +parse_view_param("start_key", Value) -> + [{start_key, ?JSON_DECODE(Value)}]; +% TODO: maybe deprecate endkey parse_view_param("endkey", Value) -> [{end_key, ?JSON_DECODE(Value)}]; +parse_view_param("end_key", Value) -> + [{end_key, ?JSON_DECODE(Value)}]; parse_view_param("limit", Value) -> [{limit, parse_positive_int_param(Value)}]; parse_view_param("count", _Value) -> throw({query_parse_error, <<"Query parameter 'count' is now 'limit'.">>}); parse_view_param("stale", "ok") -> [{stale, ok}]; +parse_view_param("stale", "update_after") -> + [{stale, update_after}]; parse_view_param("stale", _Value) -> - throw({query_parse_error, <<"stale only available as stale=ok">>}); + throw({query_parse_error, + <<"stale only available as stale=ok or as stale=update_after">>}); parse_view_param("update", _Value) -> throw({query_parse_error, <<"update=false is now stale=ok">>}); parse_view_param("descending", Value) -> @@ -303,6 +319,8 @@ parse_view_param("reduce", Value) -> [{reduce, parse_bool_param(Value)}]; parse_view_param("include_docs", Value) -> [{include_docs, parse_bool_param(Value)}]; +parse_view_param("conflicts", Value) -> + [{conflicts, parse_bool_param(Value)}]; parse_view_param("list", Value) -> [{list, ?l2b(Value)}]; parse_view_param("callback", _) -> @@ -310,6 +328,26 @@ parse_view_param("callback", _) -> parse_view_param(Key, Value) -> [{extra, {Key, Value}}]. +warn_on_empty_key_range(#view_query_args{start_key=undefined}) -> + ok; +warn_on_empty_key_range(#view_query_args{end_key=undefined}) -> + ok; +warn_on_empty_key_range(#view_query_args{start_key=A, end_key=A}) -> + ok; +warn_on_empty_key_range(#view_query_args{ + start_key=StartKey, end_key=EndKey, direction=Dir}) -> + case {Dir, couch_view:less_json(StartKey, EndKey)} of + {fwd, false} -> + throw({query_parse_error, + <<"No rows can match your key range, reverse your ", + "start_key and end_key or set descending=true">>}); + {rev, true} -> + throw({query_parse_error, + <<"No rows can match your key range, reverse your ", + "start_key and end_key or set descending=false">>}); + _ -> ok + end. + validate_view_query(start_key, Value, Args) -> case Args#view_query_args.multi_get of true -> @@ -336,6 +374,10 @@ validate_view_query(limit, Value, Args) -> Args#view_query_args{limit=Value}; validate_view_query(list, Value, Args) -> Args#view_query_args{list=Value}; +validate_view_query(stale, ok, Args) -> + Args#view_query_args{stale=ok}; +validate_view_query(stale, update_after, Args) -> + Args#view_query_args{stale=update_after}; validate_view_query(stale, _, Args) -> Args; validate_view_query(descending, true, Args) -> @@ -387,6 +429,15 @@ validate_view_query(include_docs, true, Args) -> % Use the view_query_args record's default value validate_view_query(include_docs, _Value, Args) -> Args; +validate_view_query(conflicts, true, Args) -> + case Args#view_query_args.view_type of + reduce -> + Msg = <<"Query parameter `conflicts` " + "is invalid for reduce views.">>, + throw({query_parse_error, Msg}); + _ -> + Args#view_query_args{conflicts = true} + end; validate_view_query(extra, _Value, Args) -> Args. @@ -398,7 +449,8 @@ make_view_fold_fun(Req, QueryArgs, Etag, Db, UpdateSeq, TotalViewCount, HelperFu } = apply_default_helper_funs(HelperFuns), #view_query_args{ - include_docs = IncludeDocs + include_docs = IncludeDocs, + conflicts = Conflicts } = QueryArgs, fun({{Key, DocId}, Value}, OffsetReds, @@ -416,12 +468,12 @@ make_view_fold_fun(Req, QueryArgs, Etag, Db, UpdateSeq, TotalViewCount, HelperFu {ok, Resp2, RowFunAcc0} = StartRespFun(Req, Etag, TotalViewCount, Offset, RowFunAcc, UpdateSeq), {Go, RowFunAcc2} = SendRowFun(Resp2, Db, {{Key, DocId}, Value}, - IncludeDocs, RowFunAcc0), + IncludeDocs, Conflicts, RowFunAcc0), {Go, {AccLimit - 1, 0, Resp2, RowFunAcc2}}; {AccLimit, _, Resp} when (AccLimit > 0) -> % rendering all other rows {Go, RowFunAcc2} = SendRowFun(Resp, Db, {{Key, DocId}, Value}, - IncludeDocs, RowFunAcc), + IncludeDocs, Conflicts, RowFunAcc), {Go, {AccLimit - 1, 0, Resp, RowFunAcc2}} end end. @@ -497,7 +549,7 @@ apply_default_helper_funs( end, SendRow2 = case SendRow of - undefined -> fun send_json_view_row/5; + undefined -> fun send_json_view_row/6; _ -> SendRow end, @@ -570,8 +622,8 @@ json_view_start_resp(Req, Etag, TotalViewCount, Offset, _Acc, UpdateSeq) -> end, {ok, Resp, BeginBody}. -send_json_view_row(Resp, Db, {{Key, DocId}, Value}, IncludeDocs, RowFront) -> - JsonObj = view_row_obj(Db, {{Key, DocId}, Value}, IncludeDocs), +send_json_view_row(Resp, Db, Kv, IncludeDocs, Conflicts, RowFront) -> + JsonObj = view_row_obj(Db, Kv, IncludeDocs, Conflicts), send_chunk(Resp, RowFront ++ ?JSON_ENCODE(JsonObj)), {ok, ",\r\n"}. @@ -588,22 +640,21 @@ send_json_reduce_row(Resp, {Key, Value}, RowFront) -> send_chunk(Resp, RowFront ++ ?JSON_ENCODE({[{key, Key}, {value, Value}]})), {ok, ",\r\n"}. -view_group_etag(Group, Db) -> - view_group_etag(Group, Db, nil). +view_etag(Db, Group, View) -> + view_etag(Db, Group, View, nil). -view_group_etag(#group{sig=Sig,current_seq=CurrentSeq}, _Db, Extra) -> - % ?LOG_ERROR("Group ~p",[Group]), - % This is not as granular as it could be. - % If there are updates to the db that do not effect the view index, - % they will change the Etag. For more granular Etags we'd need to keep - % track of the last Db seq that caused an index change. - couch_httpd:make_etag({Sig, CurrentSeq, Extra}). +view_etag(Db, Group, {reduce, _, _, View}, Extra) -> + view_etag(Db, Group, View, Extra); +view_etag(Db, Group, {temp_reduce, View}, Extra) -> + view_etag(Db, Group, View, Extra); +view_etag(_Db, #group{sig=Sig}, #view{update_seq=UpdateSeq, purge_seq=PurgeSeq}, Extra) -> + couch_httpd:make_etag({Sig, UpdateSeq, PurgeSeq, Extra}). % the view row has an error -view_row_obj(_Db, {{Key, error}, Value}, _IncludeDocs) -> +view_row_obj(_Db, {{Key, error}, Value}, _IncludeDocs, _Conflicts) -> {[{key, Key}, {error, Value}]}; % include docs in the view output -view_row_obj(Db, {{Key, DocId}, {Props}}, true) -> +view_row_obj(Db, {{Key, DocId}, {Props}}, true, Conflicts) -> Rev = case couch_util:get_value(<<"_rev">>, Props) of undefined -> nil; @@ -611,19 +662,29 @@ view_row_obj(Db, {{Key, DocId}, {Props}}, true) -> couch_doc:parse_rev(Rev0) end, IncludeId = couch_util:get_value(<<"_id">>, Props, DocId), - view_row_with_doc(Db, {{Key, DocId}, {Props}}, {IncludeId, Rev}); -view_row_obj(Db, {{Key, DocId}, Value}, true) -> - view_row_with_doc(Db, {{Key, DocId}, Value}, {DocId, nil}); + view_row_with_doc(Db, {{Key, DocId}, {Props}}, {IncludeId, Rev}, Conflicts); +view_row_obj(Db, {{Key, DocId}, Value}, true, Conflicts) -> + view_row_with_doc(Db, {{Key, DocId}, Value}, {DocId, nil}, Conflicts); % the normal case for rendering a view row -view_row_obj(_Db, {{Key, DocId}, Value}, _IncludeDocs) -> +view_row_obj(_Db, {{Key, DocId}, Value}, _IncludeDocs, _Conflicts) -> {[{id, DocId}, {key, Key}, {value, Value}]}. -view_row_with_doc(Db, {{Key, DocId}, Value}, IdRev) -> - {[{id, DocId}, {key, Key}, {value, Value}] ++ doc_member(Db, IdRev)}. +view_row_with_doc(Db, {{Key, DocId}, Value}, IdRev, Conflicts) -> + {[{id, DocId}, {key, Key}, {value, Value}] ++ + doc_member(Db, IdRev, if Conflicts -> [conflicts]; true -> [] end)}. -doc_member(Db, {DocId, Rev}) -> +doc_member(Db, #doc_info{id = Id, revs = [#rev_info{rev = Rev} | _]} = Info, + Options) -> + ?LOG_DEBUG("Include Doc: ~p ~p", [Id, Rev]), + case couch_db:open_doc(Db, Info, [deleted | Options]) of + {ok, Doc} -> + [{doc, couch_doc:to_json_obj(Doc, [])}]; + _ -> + [{doc, null}] + end; +doc_member(Db, {DocId, Rev}, Options) -> ?LOG_DEBUG("Include Doc: ~p ~p", [DocId, Rev]), - case (catch couch_httpd_db:couch_doc_open(Db, DocId, Rev, [])) of + case (catch couch_httpd_db:couch_doc_open(Db, DocId, Rev, Options)) of #doc{} = Doc -> JsonDoc = couch_doc:to_json_obj(Doc, []), [{doc, JsonDoc}]; diff --git a/apps/couch/src/couch_js_functions.hrl b/apps/couch/src/couch_js_functions.hrl deleted file mode 100644 index 32573a90..00000000 --- a/apps/couch/src/couch_js_functions.hrl +++ /dev/null @@ -1,97 +0,0 @@ -% Licensed under the Apache License, Version 2.0 (the "License"); you may not -% use this file except in compliance with the License. You may obtain a copy of -% the License at -% -% http://www.apache.org/licenses/LICENSE-2.0 -% -% Unless required by applicable law or agreed to in writing, software -% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -% License for the specific language governing permissions and limitations under -% the License. - --define(AUTH_DB_DOC_VALIDATE_FUNCTION, <<" - function(newDoc, oldDoc, userCtx) { - if (newDoc._deleted === true) { - // allow deletes by admins and matching users - // without checking the other fields - if ((userCtx.roles.indexOf('_admin') !== -1) || - (userCtx.name == oldDoc.name)) { - return; - } else { - throw({forbidden: 'Only admins may delete other user docs.'}); - } - } - - if ((oldDoc && oldDoc.type !== 'user') || newDoc.type !== 'user') { - throw({forbidden : 'doc.type must be user'}); - } // we only allow user docs for now - - if (!newDoc.name) { - throw({forbidden: 'doc.name is required'}); - } - - if (newDoc.roles && !isArray(newDoc.roles)) { - throw({forbidden: 'doc.roles must be an array'}); - } - - if (newDoc._id !== ('org.couchdb.user:' + newDoc.name)) { - throw({ - forbidden: 'Doc ID must be of the form org.couchdb.user:name' - }); - } - - if (oldDoc) { // validate all updates - if (oldDoc.name !== newDoc.name) { - throw({forbidden: 'Usernames can not be changed.'}); - } - } - - if (newDoc.password_sha && !newDoc.salt) { - throw({ - forbidden: 'Users with password_sha must have a salt.' + - 'See /_utils/script/couch.js for example code.' - }); - } - - if (userCtx.roles.indexOf('_admin') === -1) { - if (oldDoc) { // validate non-admin updates - if (userCtx.name !== newDoc.name) { - throw({ - forbidden: 'You may only update your own user document.' - }); - } - // validate role updates - var oldRoles = oldDoc.roles.sort(); - var newRoles = newDoc.roles.sort(); - - if (oldRoles.length !== newRoles.length) { - throw({forbidden: 'Only _admin may edit roles'}); - } - - for (var i = 0; i < oldRoles.length; i++) { - if (oldRoles[i] !== newRoles[i]) { - throw({forbidden: 'Only _admin may edit roles'}); - } - } - } else if (newDoc.roles.length > 0) { - throw({forbidden: 'Only _admin may set roles'}); - } - } - - // no system roles in users db - for (var i = 0; i < newDoc.roles.length; i++) { - if (newDoc.roles[i][0] === '_') { - throw({ - forbidden: - 'No system roles (starting with underscore) in users db.' - }); - } - } - - // no system names as names - if (newDoc.name[0] === '_') { - throw({forbidden: 'Username may not start with underscore.'}); - } - } -">>). diff --git a/apps/couch/src/couch_key_tree.erl b/apps/couch/src/couch_key_tree.erl index 8b574309..2f3c6abf 100644 --- a/apps/couch/src/couch_key_tree.erl +++ b/apps/couch/src/couch_key_tree.erl @@ -10,6 +10,41 @@ % License for the specific language governing permissions and limitations under % the License. +%% @doc Data structure used to represent document edit histories. + +%% A key tree is used to represent the edit history of a document. Each node of +%% the tree represents a particular version. Relations between nodes represent +%% the order that these edits were applied. For instance, a set of three edits +%% would produce a tree of versions A->B->C indicating that edit C was based on +%% version B which was in turn based on A. In a world without replication (and +%% no ability to disable MVCC checks), all histories would be forced to be +%% linear lists of edits due to constraints imposed by MVCC (ie, new edits must +%% be based on the current version). However, we have replication, so we must +%% deal with not so easy cases, which lead to trees. +%% +%% Consider a document in state A. This doc is replicated to a second node. We +%% then edit the document on each node leaving it in two different states, B +%% and C. We now have two key trees, A->B and A->C. When we go to replicate a +%% second time, the key tree must combine these two trees which gives us +%% A->(B|C). This is how conflicts are introduced. In terms of the key tree, we +%% say that we have two leaves (B and C) that are not deleted. The presense of +%% the multiple leaves indicate conflict. To remove a conflict, one of the +%% edits (B or C) can be deleted, which results in, A->(B|C->D) where D is an +%% edit that is specially marked with the a deleted=true flag. +%% +%% What makes this a bit more complicated is that there is a limit to the +%% number of revisions kept, specified in couch_db.hrl (default is 1000). When +%% this limit is exceeded only the last 1000 are kept. This comes in to play +%% when branches are merged. The comparison has to begin at the same place in +%% the branches. A revision id is of the form N-XXXXXXX where N is the current +%% revision. So each path will have a start number, calculated in +%% couch_doc:to_path using the formula N - length(RevIds) + 1 So, .eg. if a doc +%% was edit 1003 times this start number would be 4, indicating that 3 +%% revisions were truncated. +%% +%% This comes into play in @see merge_at/3 which recursively walks down one +%% tree or the other until they begin at the same revision. + -module(couch_key_tree). -export([merge/3, find_missing/2, get_key_leafs/2, @@ -32,6 +67,9 @@ merge(Paths, Path, Depth) -> {Merged, Conflicts} = merge(Paths, Path), {stem(Merged, Depth), Conflicts}. +%% @doc Merge a path with an existing list of paths, returning a new list of +%% paths. A return of conflicts indicates a new conflict was discovered in this +%% merge. Conflicts may already exist in the original list of paths. -spec merge([path()], path()) -> {[path()], conflicts | no_conflicts}. merge(Paths, Path) -> {ok, Merged, HasConflicts} = merge_one(Paths, Path, [], false), @@ -44,8 +82,8 @@ merge(Paths, Path) -> end, {lists:sort(Merged), Conflicts}. --spec merge_one(Original::[path()], Inserted::path(), [path()], bool()) -> - {ok, Merged::[path()], NewConflicts::bool()}. +-spec merge_one(Original::[path()], Inserted::path(), [path()], boolean()) -> + {ok, Merged::[path()], NewConflicts::boolean()}. merge_one([], Insert, OutAcc, ConflictsAcc) -> {ok, [Insert | OutAcc], ConflictsAcc}; merge_one([{Start, Tree}|Rest], {StartInsert, TreeInsert}, Acc, HasConflicts) -> @@ -59,7 +97,7 @@ merge_one([{Start, Tree}|Rest], {StartInsert, TreeInsert}, Acc, HasConflicts) -> end. -spec merge_at(tree(), Place::integer(), tree()) -> - {ok, Merged::tree(), HasConflicts::bool()} | no. + {ok, Merged::tree(), HasConflicts::boolean()} | no. merge_at(_Ours, _Place, []) -> no; merge_at([], _Place, _Insert) -> @@ -70,6 +108,7 @@ merge_at([{Key, Value, SubTree}|Sibs], Place, InsertTree) when Place > 0 -> {ok, Merged, Conflicts} -> {ok, [{Key, Value, Merged} | Sibs], Conflicts}; no -> + % first branch didn't merge, move to next branch case merge_at(Sibs, Place, InsertTree) of {ok, Merged, Conflicts} -> {ok, [{Key, Value, SubTree} | Merged], Conflicts}; @@ -85,9 +124,9 @@ merge_at(OurTree, Place, [{Key, Value, SubTree}]) when Place < 0 -> no -> no end; -merge_at([{Key, Value, SubTree}|Sibs], 0, [{Key, _Value, InsertSubTree}]) -> +merge_at([{Key, V1, SubTree}|Sibs], 0, [{Key, V2, InsertSubTree}]) -> {Merged, Conflicts} = merge_simple(SubTree, InsertSubTree), - {ok, [{Key, Value, Merged} | Sibs], Conflicts}; + {ok, [{Key, value_pref(V1, V2), Merged} | Sibs], Conflicts}; merge_at([{OurKey, _, _} | _], 0, [{Key, _, _}]) when OurKey > Key -> % siblings keys are ordered, no point in continuing no; @@ -101,21 +140,23 @@ merge_at([Tree | Sibs], 0, InsertTree) -> % key tree functions --spec merge_simple(tree(), tree()) -> {Merged::tree(), NewConflicts::bool()}. +-spec merge_simple(tree(), tree()) -> {Merged::tree(), NewConflicts::boolean()}. merge_simple([], B) -> {B, false}; merge_simple(A, []) -> {A, false}; -merge_simple([{Key, Value, SubA} | NextA], [{Key, _, SubB} | NextB]) -> +merge_simple([{Key, V1, SubA} | NextA], [{Key, V2, SubB} | NextB]) -> {MergedSubTree, Conflict1} = merge_simple(SubA, SubB), {MergedNextTree, Conflict2} = merge_simple(NextA, NextB), + Value = value_pref(V1, V2), {[{Key, Value, MergedSubTree} | MergedNextTree], Conflict1 or Conflict2}; merge_simple([{A, _, _} = Tree | Next], [{B, _, _} | _] = Insert) when A < B -> - {Merged, _} = merge_simple(Next, Insert), - {[Tree | Merged], true}; + {Merged, Conflict} = merge_simple(Next, Insert), + % if Merged has more branches than the input we added a new conflict + {[Tree | Merged], Conflict orelse (length(Merged) > length(Next))}; merge_simple(Ours, [Tree | Next]) -> - {Merged, _} = merge_simple(Ours, Next), - {[Tree | Merged], true}. + {Merged, Conflict} = merge_simple(Ours, Next), + {[Tree | Merged], Conflict orelse (length(Merged) > length(Next))}. find_missing(_Tree, []) -> []; @@ -160,14 +201,18 @@ remove_leafs(Trees, Keys) -> % filter out any that are in the keys list. {FilteredPaths, RemovedKeys} = filter_leafs(Paths, Keys, [], []), + SortedPaths = lists:sort( + [{Pos + 1 - length(Path), Path} || {Pos, Path} <- FilteredPaths] + ), + % convert paths back to trees NewTree = lists:foldl( - fun({PathPos, Path},TreeAcc) -> + fun({StartPos, Path},TreeAcc) -> [SingleTree] = lists:foldl( fun({K,V},NewTreeAcc) -> [{K,V,NewTreeAcc}] end, [], Path), - {NewTrees, _} = merge(TreeAcc, {PathPos + 1 - length(Path), SingleTree}), + {NewTrees, _} = merge(TreeAcc, {StartPos, SingleTree}), NewTrees - end, [], FilteredPaths), + end, [], SortedPaths), {NewTree, RemovedKeys}. @@ -364,19 +409,35 @@ map_leafs_simple(Fun, Pos, [{Key, Value, SubTree} | RestTree]) -> stem(Trees, Limit) -> - % flatten each branch in a tree into a tree path - Paths = get_all_leafs_full(Trees), - - Paths2 = [{Pos, lists:sublist(Path, Limit)} || {Pos, Path} <- Paths], + % flatten each branch in a tree into a tree path, sort by starting rev # + Paths = lists:sort(lists:map(fun({Pos, Path}) -> + StemmedPath = lists:sublist(Path, Limit), + {Pos + 1 - length(StemmedPath), StemmedPath} + end, get_all_leafs_full(Trees))), % convert paths back to trees lists:foldl( - fun({PathPos, Path},TreeAcc) -> + fun({StartPos, Path},TreeAcc) -> [SingleTree] = lists:foldl( fun({K,V},NewTreeAcc) -> [{K,V,NewTreeAcc}] end, [], Path), - {NewTrees, _} = merge(TreeAcc, {PathPos + 1 - length(Path), SingleTree}), + {NewTrees, _} = merge(TreeAcc, {StartPos, SingleTree}), NewTrees - end, [], Paths2). + end, [], Paths). + + +value_pref(Tuple, _) when is_tuple(Tuple), + (tuple_size(Tuple) == 3 orelse tuple_size(Tuple) == 4) -> + Tuple; +value_pref(_, Tuple) when is_tuple(Tuple), + (tuple_size(Tuple) == 3 orelse tuple_size(Tuple) == 4) -> + Tuple; +value_pref(?REV_MISSING, Other) -> + Other; +value_pref(Other, ?REV_MISSING) -> + Other; +value_pref(Last, _) -> + Last. + % Tests moved to test/etap/06?-*.t diff --git a/apps/couch/src/couch_log.erl b/apps/couch/src/couch_log.erl index 80ce0600..9bac7450 100644 --- a/apps/couch/src/couch_log.erl +++ b/apps/couch/src/couch_log.erl @@ -14,6 +14,7 @@ -behaviour(gen_event). -export([start_link/0,stop/0]). +-export([debug/2, info/2, error/2]). -export([debug_on/0,info_on/0,get_level/0,get_level_integer/0, set_level/1]). -export([init/1, handle_event/2, terminate/2, code_change/3, handle_info/2, handle_call/2]). -export([read/2]). @@ -23,6 +24,29 @@ -define(LEVEL_DEBUG, 1). -define(LEVEL_TMI, 0). +debug(Format, Args) -> + case debug_on() of + false -> + ok; + true -> + {ConsoleMsg, FileMsg} = get_log_messages(self(), debug, Format, Args), + gen_event:sync_notify(error_logger, {couch_debug, ConsoleMsg, FileMsg}) + end. + +info(Format, Args) -> + case info_on() of + false -> + ok; + true -> + {ConsoleMsg, FileMsg} = get_log_messages(self(), info, Format, Args), + gen_event:sync_notify(error_logger, {couch_info, ConsoleMsg, FileMsg}) + end. + +error(Format, Args) -> + {ConsoleMsg, FileMsg} = get_log_messages(self(), error, Format, Args), + gen_event:sync_notify(error_logger, {couch_error, ConsoleMsg, FileMsg}). + + level_integer(error) -> ?LEVEL_ERROR; level_integer(info) -> ?LEVEL_INFO; level_integer(debug) -> ?LEVEL_DEBUG; @@ -65,8 +89,14 @@ init([]) -> end, ets:insert(?MODULE, {level, Level}), - {ok, Fd} = file:open(Filename, [append]), - {ok, {Fd, Level, Sasl}}. + case file:open(Filename, [append]) of + {ok, Fd} -> + {ok, {Fd, Level, Sasl}}; + {error, eacces} -> + {stop, {file_permission_error, Filename}}; + Error -> + {stop, Error} + end. debug_on() -> get_level_integer() =< ?LEVEL_DEBUG. @@ -90,29 +120,32 @@ get_level_integer() -> set_level_integer(Int) -> gen_event:call(error_logger, couch_log, {set_level_integer, Int}). -handle_event({Pid, couch_error, Id, {Format, Args}}, {Fd, _, _}=State) -> - log(Fd, Pid, error, Id, Format, Args), +handle_event({couch_error, ConMsg, FileMsg}, {Fd, _LogLevel, _Sasl}=State) -> + log(Fd, ConMsg, FileMsg), {ok, State}; -handle_event({Pid, couch_info, Id, {Format, Args}}, {Fd, LogLevel, _Sasl}=State) +handle_event({couch_info, ConMsg, FileMsg}, {Fd, LogLevel, _Sasl}=State) when LogLevel =< ?LEVEL_INFO -> - log(Fd, Pid, info, Id, Format, Args), + log(Fd, ConMsg, FileMsg), {ok, State}; -handle_event({Pid, couch_debug, Id, {Format, Args}}, {Fd, LogLevel, _Sasl}=State) +handle_event({couch_debug, ConMsg, FileMsg}, {Fd, LogLevel, _Sasl}=State) when LogLevel =< ?LEVEL_DEBUG -> - log(Fd, Pid, debug, Id, Format, Args), + log(Fd, ConMsg, FileMsg), {ok, State}; handle_event({error_report, _, {Pid, _, _}}=Event, {Fd, _LogLevel, Sasl}=State) when Sasl =/= false -> - log(Fd, Pid, error, undefined, "~p", [Event]), + {ConMsg, FileMsg} = get_log_messages(Pid, error, "~p", [Event]), + log(Fd, ConMsg, FileMsg), {ok, State}; handle_event({error, _, {Pid, Format, Args}}, {Fd, _LogLevel, Sasl}=State) when Sasl =/= false -> - log(Fd, Pid, error, undefined, Format, Args), + {ConMsg, FileMsg} = get_log_messages(Pid, error, Format, Args), + log(Fd, ConMsg, FileMsg), {ok, State}; handle_event({_, _, {Pid, _, _}}=Event, {Fd, LogLevel, _Sasl}=State) when LogLevel =< ?LEVEL_TMI -> % log every remaining event if tmi! - log(Fd, Pid, tmi, undefined, "~p", [Event]), + {ConMsg, FileMsg} = get_log_messages(Pid, tmi, "~p", [Event]), + log(Fd, ConMsg, FileMsg), {ok, State}; handle_event(_Event, State) -> {ok, State}. @@ -130,19 +163,23 @@ code_change(_OldVsn, State, _Extra) -> terminate(_Arg, {Fd, _LoggingLevel, _Sasl}) -> file:close(Fd). -log(Fd, Pid, Level, undefined, Format, Args) -> - log(Fd, Pid, Level, "--------", Format, Args); -log(Fd, Pid, Level, Id, Format, Args) -> - Msg = io_lib:format(Format, Args), - ok = io:format("[~s] [~p] [~s] ~s~n", [Level, Pid, Id, Msg]), - Msg2 = re:replace(lists:flatten(Msg),"\\r\\n|\\r|\\n", "\r\n", - [global, {return, list}]), - ok = io:format(Fd, "[~s] [~s] [~p] [~s] ~s\r~n\r~n", - [httpd_util:rfc1123_date(), Level, Pid, Id, Msg2]). +log(Fd, ConsoleMsg, FileMsg) -> + ok = io:put_chars(ConsoleMsg), + ok = io:put_chars(Fd, FileMsg). + +get_log_messages(Pid, Level, Format, Args) -> + Nonce = case erlang:get(nonce) of + undefined -> "--------"; + Else -> Else + end, + ConsoleMsg = unicode:characters_to_binary(io_lib:format( + "[~s] [~p] [~s] " ++ Format ++ "~n", [Level, Pid, Nonce | Args])), + FileMsg = ["[", httpd_util:rfc1123_date(), "] ", ConsoleMsg], + {ConsoleMsg, iolist_to_binary(FileMsg)}. read(Bytes, Offset) -> LogFileName = couch_config:get("log", "file"), - LogFileSize = couch_util:file_read_size(LogFileName), + LogFileSize = filelib:file_size(LogFileName), {ok, Fd} = file:open(LogFileName, [read]), Start = lists:max([LogFileSize - Bytes, 0]) + Offset, diff --git a/apps/couch/src/couch_os_daemons.erl b/apps/couch/src/couch_os_daemons.erl new file mode 100644 index 00000000..d03f550c --- /dev/null +++ b/apps/couch/src/couch_os_daemons.erl @@ -0,0 +1,364 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. +-module(couch_os_daemons). +-behaviour(gen_server). + +-export([start_link/0, info/0, info/1, config_change/2]). + +-export([init/1, terminate/2, code_change/3]). +-export([handle_call/3, handle_cast/2, handle_info/2]). + +-include("couch_db.hrl"). + +-record(daemon, { + port, + name, + cmd, + kill, + status=running, + cfg_patterns=[], + errors=[], + buf=[] +}). + +-define(PORT_OPTIONS, [stream, {line, 1024}, binary, exit_status, hide]). +-define(TIMEOUT, 5000). + +start_link() -> + gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). + +info() -> + info([]). + +info(Options) -> + gen_server:call(?MODULE, {daemon_info, Options}). + +config_change(Section, Key) -> + gen_server:cast(?MODULE, {config_change, Section, Key}). + +init(_) -> + process_flag(trap_exit, true), + ok = couch_config:register(fun couch_os_daemons:config_change/2), + Table = ets:new(?MODULE, [protected, set, {keypos, #daemon.port}]), + reload_daemons(Table), + {ok, Table}. + +terminate(_Reason, Table) -> + [stop_port(D) || D <- ets:tab2list(Table)], + ok. + +handle_call({daemon_info, Options}, _From, Table) when is_list(Options) -> + case lists:member(table, Options) of + true -> + {reply, {ok, ets:tab2list(Table)}, Table}; + _ -> + {reply, {ok, Table}, Table} + end; +handle_call(Msg, From, Table) -> + ?LOG_ERROR("Unknown call message to ~p from ~p: ~p", [?MODULE, From, Msg]), + {stop, error, Table}. + +handle_cast({config_change, Sect, Key}, Table) -> + restart_daemons(Table, Sect, Key), + case Sect of + "os_daemons" -> reload_daemons(Table); + _ -> ok + end, + {noreply, Table}; +handle_cast(stop, Table) -> + {stop, normal, Table}; +handle_cast(Msg, Table) -> + ?LOG_ERROR("Unknown cast message to ~p: ~p", [?MODULE, Msg]), + {stop, error, Table}. + +handle_info({'EXIT', Port, Reason}, Table) -> + case ets:lookup(Table, Port) of + [] -> + ?LOG_INFO("Port ~p exited after stopping: ~p~n", [Port, Reason]); + [#daemon{status=stopping}] -> + true = ets:delete(Table, Port); + [#daemon{name=Name, status=restarting}=D] -> + ?LOG_INFO("Daemon ~P restarting after config change.", [Name]), + true = ets:delete(Table, Port), + {ok, Port2} = start_port(D#daemon.cmd), + true = ets:insert(Table, D#daemon{ + port=Port2, status=running, kill=undefined, buf=[] + }); + [#daemon{name=Name, status=halted}] -> + ?LOG_ERROR("Halted daemon process: ~p", [Name]); + [D] -> + ?LOG_ERROR("Invalid port state at exit: ~p", [D]) + end, + {noreply, Table}; +handle_info({Port, closed}, Table) -> + handle_info({Port, {exit_status, closed}}, Table); +handle_info({Port, {exit_status, Status}}, Table) -> + case ets:lookup(Table, Port) of + [] -> + ?LOG_ERROR("Unknown port ~p exiting ~p", [Port, Status]), + {stop, {error, unknown_port_died, Status}, Table}; + [#daemon{name=Name, status=restarting}=D] -> + ?LOG_INFO("Daemon ~P restarting after config change.", [Name]), + true = ets:delete(Table, Port), + {ok, Port2} = start_port(D#daemon.cmd), + true = ets:insert(Table, D#daemon{ + port=Port2, status=running, kill=undefined, buf=[] + }), + {noreply, Table}; + [#daemon{status=stopping}=D] -> + % The configuration changed and this daemon is no + % longer needed. + ?LOG_DEBUG("Port ~p shut down.", [D#daemon.name]), + true = ets:delete(Table, Port), + {noreply, Table}; + [D] -> + % Port died for unknown reason. Check to see if it's + % died too many times or if we should boot it back up. + case should_halt([now() | D#daemon.errors]) of + {true, _} -> + % Halting the process. We won't try and reboot + % until the configuration changes. + Fmt = "Daemon ~p halted with exit_status ~p", + ?LOG_ERROR(Fmt, [D#daemon.name, Status]), + D2 = D#daemon{status=halted, errors=nil, buf=nil}, + true = ets:insert(Table, D2), + {noreply, Table}; + {false, Errors} -> + % We're guessing it was a random error, this daemon + % has behaved so we'll give it another chance. + Fmt = "Daemon ~p is being rebooted after exit_status ~p", + ?LOG_INFO(Fmt, [D#daemon.name, Status]), + true = ets:delete(Table, Port), + {ok, Port2} = start_port(D#daemon.cmd), + true = ets:insert(Table, D#daemon{ + port=Port2, status=running, kill=undefined, + errors=Errors, buf=[] + }), + {noreply, Table} + end; + _Else -> + throw(error) + end; +handle_info({Port, {data, {noeol, Data}}}, Table) -> + [#daemon{buf=Buf}=D] = ets:lookup(Table, Port), + true = ets:insert(Table, D#daemon{buf=[Data | Buf]}), + {noreply, Table}; +handle_info({Port, {data, {eol, Data}}}, Table) -> + [#daemon{buf=Buf}=D] = ets:lookup(Table, Port), + Line = lists:reverse(Buf, Data), + % The first line echoed back is the kill command + % for when we go to get rid of the port. Lines after + % that are considered part of the stdio API. + case D#daemon.kill of + undefined -> + true = ets:insert(Table, D#daemon{kill=?b2l(Line), buf=[]}); + _Else -> + D2 = case (catch ?JSON_DECODE(Line)) of + {invalid_json, Rejected} -> + ?LOG_ERROR("Ignoring OS daemon request: ~p", [Rejected]), + D; + JSON -> + {ok, D3} = handle_port_message(D, JSON), + D3 + end, + true = ets:insert(Table, D2#daemon{buf=[]}) + end, + {noreply, Table}; +handle_info({Port, Error}, Table) -> + ?LOG_ERROR("Unexpectd message from port ~p: ~p", [Port, Error]), + stop_port(Port), + [D] = ets:lookup(Table, Port), + true = ets:insert(Table, D#daemon{status=restarting, buf=nil}), + {noreply, Table}; +handle_info(Msg, Table) -> + ?LOG_ERROR("Unexpected info message to ~p: ~p", [?MODULE, Msg]), + {stop, error, Table}. + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +% Internal API + +% +% Port management helpers +% + +start_port(Command) -> + PrivDir = couch_util:priv_dir(), + Spawnkiller = filename:join(PrivDir, "couchspawnkillable"), + Port = open_port({spawn, Spawnkiller ++ " " ++ Command}, ?PORT_OPTIONS), + {ok, Port}. + + +stop_port(#daemon{port=Port, kill=undefined}=D) -> + ?LOG_ERROR("Stopping daemon without a kill command: ~p", [D#daemon.name]), + catch port_close(Port); +stop_port(#daemon{port=Port}=D) -> + ?LOG_DEBUG("Stopping daemon: ~p", [D#daemon.name]), + os:cmd(D#daemon.kill), + catch port_close(Port). + + +handle_port_message(#daemon{port=Port}=Daemon, [<<"get">>, Section]) -> + KVs = couch_config:get(Section), + Data = lists:map(fun({K, V}) -> {?l2b(K), ?l2b(V)} end, KVs), + Json = iolist_to_binary(?JSON_ENCODE({Data})), + port_command(Port, <<Json/binary, "\n">>), + {ok, Daemon}; +handle_port_message(#daemon{port=Port}=Daemon, [<<"get">>, Section, Key]) -> + Value = case couch_config:get(Section, Key, null) of + null -> null; + String -> ?l2b(String) + end, + Json = iolist_to_binary(?JSON_ENCODE(Value)), + port_command(Port, <<Json/binary, "\n">>), + {ok, Daemon}; +handle_port_message(Daemon, [<<"register">>, Sec]) when is_binary(Sec) -> + Patterns = lists:usort(Daemon#daemon.cfg_patterns ++ [{?b2l(Sec)}]), + {ok, Daemon#daemon{cfg_patterns=Patterns}}; +handle_port_message(Daemon, [<<"register">>, Sec, Key]) + when is_binary(Sec) andalso is_binary(Key) -> + Pattern = {?b2l(Sec), ?b2l(Key)}, + Patterns = lists:usort(Daemon#daemon.cfg_patterns ++ [Pattern]), + {ok, Daemon#daemon{cfg_patterns=Patterns}}; +handle_port_message(#daemon{name=Name}=Daemon, [<<"log">>, Msg]) -> + handle_log_message(Name, Msg, <<"info">>), + {ok, Daemon}; +handle_port_message(#daemon{name=Name}=Daemon, [<<"log">>, Msg, {Opts}]) -> + Level = couch_util:get_value(<<"level">>, Opts, <<"info">>), + handle_log_message(Name, Msg, Level), + {ok, Daemon}; +handle_port_message(#daemon{name=Name}=Daemon, Else) -> + ?LOG_ERROR("Daemon ~p made invalid request: ~p", [Name, Else]), + {ok, Daemon}. + + +handle_log_message(Name, Msg, _Level) when not is_binary(Msg) -> + ?LOG_ERROR("Invalid log message from daemon ~p: ~p", [Name, Msg]); +handle_log_message(Name, Msg, <<"debug">>) -> + ?LOG_DEBUG("Daemon ~p :: ~s", [Name, ?b2l(Msg)]); +handle_log_message(Name, Msg, <<"info">>) -> + ?LOG_INFO("Daemon ~p :: ~s", [Name, ?b2l(Msg)]); +handle_log_message(Name, Msg, <<"error">>) -> + ?LOG_ERROR("Daemon: ~p :: ~s", [Name, ?b2l(Msg)]); +handle_log_message(Name, Msg, Level) -> + ?LOG_ERROR("Invalid log level from daemon: ~p", [Level]), + ?LOG_INFO("Daemon: ~p :: ~s", [Name, ?b2l(Msg)]). + +% +% Daemon management helpers +% + +reload_daemons(Table) -> + % List of daemons we want to have running. + Configured = lists:sort(couch_config:get("os_daemons")), + + % Remove records for daemons that were halted. + MSpecHalted = #daemon{name='$1', cmd='$2', status=halted, _='_'}, + Halted = lists:sort([{N, C} || [N, C] <- ets:match(Table, MSpecHalted)]), + ok = stop_os_daemons(Table, find_to_stop(Configured, Halted, [])), + + % Stop daemons that are running + % Start newly configured daemons + MSpecRunning = #daemon{name='$1', cmd='$2', status=running, _='_'}, + Running = lists:sort([{N, C} || [N, C] <- ets:match(Table, MSpecRunning)]), + ok = stop_os_daemons(Table, find_to_stop(Configured, Running, [])), + ok = boot_os_daemons(Table, find_to_boot(Configured, Running, [])), + ok. + + +restart_daemons(Table, Sect, Key) -> + restart_daemons(Table, Sect, Key, ets:first(Table)). + +restart_daemons(_, _, _, '$end_of_table') -> + ok; +restart_daemons(Table, Sect, Key, Port) -> + [D] = ets:lookup(Table, Port), + HasSect = lists:member({Sect}, D#daemon.cfg_patterns), + HasKey = lists:member({Sect, Key}, D#daemon.cfg_patterns), + case HasSect or HasKey of + true -> + stop_port(D), + D2 = D#daemon{status=restarting, buf=nil}, + true = ets:insert(Table, D2); + _ -> + ok + end, + restart_daemons(Table, Sect, Key, ets:next(Table, Port)). + + +stop_os_daemons(_Table, []) -> + ok; +stop_os_daemons(Table, [{Name, Cmd} | Rest]) -> + [[Port]] = ets:match(Table, #daemon{port='$1', name=Name, cmd=Cmd, _='_'}), + [D] = ets:lookup(Table, Port), + case D#daemon.status of + halted -> + ets:delete(Table, Port); + _ -> + stop_port(D), + D2 = D#daemon{status=stopping, errors=nil, buf=nil}, + true = ets:insert(Table, D2) + end, + stop_os_daemons(Table, Rest). + +boot_os_daemons(_Table, []) -> + ok; +boot_os_daemons(Table, [{Name, Cmd} | Rest]) -> + {ok, Port} = start_port(Cmd), + true = ets:insert(Table, #daemon{port=Port, name=Name, cmd=Cmd}), + boot_os_daemons(Table, Rest). + +% Elements unique to the configured set need to be booted. +find_to_boot([], _Rest, Acc) -> + % Nothing else configured. + Acc; +find_to_boot([D | R1], [D | R2], Acc) -> + % Elements are equal, daemon already running. + find_to_boot(R1, R2, Acc); +find_to_boot([D1 | R1], [D2 | _]=A2, Acc) when D1 < D2 -> + find_to_boot(R1, A2, [D1 | Acc]); +find_to_boot(A1, [_ | R2], Acc) -> + find_to_boot(A1, R2, Acc); +find_to_boot(Rest, [], Acc) -> + % No more candidates for already running. Boot all. + Rest ++ Acc. + +% Elements unique to the running set need to be killed. +find_to_stop([], Rest, Acc) -> + % The rest haven't been found, so they must all + % be ready to die. + Rest ++ Acc; +find_to_stop([D | R1], [D | R2], Acc) -> + % Elements are equal, daemon already running. + find_to_stop(R1, R2, Acc); +find_to_stop([D1 | R1], [D2 | _]=A2, Acc) when D1 < D2 -> + find_to_stop(R1, A2, Acc); +find_to_stop(A1, [D2 | R2], Acc) -> + find_to_stop(A1, R2, [D2 | Acc]); +find_to_stop(_, [], Acc) -> + % No more running daemons to worry about. + Acc. + +should_halt(Errors) -> + RetryTimeCfg = couch_config:get("os_daemon_settings", "retry_time", "5"), + RetryTime = list_to_integer(RetryTimeCfg), + + Now = now(), + RecentErrors = lists:filter(fun(Time) -> + timer:now_diff(Now, Time) =< RetryTime * 1000000 + end, Errors), + + RetryCfg = couch_config:get("os_daemon_settings", "max_retries", "3"), + Retries = list_to_integer(RetryCfg), + + {length(RecentErrors) >= Retries, RecentErrors}. diff --git a/apps/couch/src/couch_proc_manager.erl b/apps/couch/src/couch_proc_manager.erl index 509da9ba..438f7973 100644 --- a/apps/couch/src/couch_proc_manager.erl +++ b/apps/couch/src/couch_proc_manager.erl @@ -150,7 +150,7 @@ make_proc(Pid, Lang, Mod) -> {ok, Proc}. get_query_server_config() -> - Limit = couch_config:get("query_server_config", "reduce_limit", "true"), + Limit = couch_config:get("query_server_config", <<"reduce_limit">>, "true"), {[{<<"reduce_limit">>, list_to_atom(Limit)}]}. proc_with_ddoc(DDoc, DDocKey, Procs) -> diff --git a/apps/couch/src/couch_query_servers.erl b/apps/couch/src/couch_query_servers.erl index 58540660..be7c465b 100644 --- a/apps/couch/src/couch_query_servers.erl +++ b/apps/couch/src/couch_query_servers.erl @@ -12,7 +12,7 @@ -module(couch_query_servers). --export([start_doc_map/2, map_docs/2, stop_doc_map/1]). +-export([start_doc_map/3, map_docs/2, stop_doc_map/1]). -export([reduce/3, rereduce/3,validate_doc_update/5]). -export([filter_docs/5]). @@ -23,8 +23,13 @@ -include("couch_db.hrl"). -start_doc_map(Lang, Functions) -> +start_doc_map(Lang, Functions, Lib) -> Proc = get_os_process(Lang), + case Lib of + {[]} -> ok; + Lib -> + true = proc_prompt(Proc, [<<"add_lib">>, Lib]) + end, lists:foreach(fun(FunctionSource) -> true = proc_prompt(Proc, [<<"add_fun">>, FunctionSource]) end, Functions), @@ -137,18 +142,35 @@ builtin_reduce(Re, [<<"_stats",_/binary>>|BuiltinReds], KVs, Acc) -> builtin_sum_rows(KVs) -> lists:foldl(fun - ([_Key, Value], Acc) when is_number(Value) -> + ([_Key, Value], Acc) when is_number(Value), is_number(Acc) -> Acc + Value; + ([_Key, Value], Acc) when is_list(Value), is_list(Acc) -> + sum_terms(Acc, Value); + ([_Key, Value], Acc) when is_number(Value), is_list(Acc) -> + sum_terms(Acc, [Value]); + ([_Key, Value], Acc) when is_list(Value), is_number(Acc) -> + sum_terms([Acc], Value); (_Else, _Acc) -> - throw({invalid_value, <<"builtin _sum function requires map values to be numbers">>}) + throw({invalid_value, <<"builtin _sum function requires map values to be numbers or lists of numbers">>}) end, 0, KVs). +sum_terms([], []) -> + []; +sum_terms([_|_]=Xs, []) -> + Xs; +sum_terms([], [_|_]=Ys) -> + Ys; +sum_terms([X|Xs], [Y|Ys]) when is_number(X), is_number(Y) -> + [X+Y | sum_terms(Xs,Ys)]; +sum_terms(_, _) -> + throw({invalid_value, <<"builtin _sum function requires map values to be numbers or lists of numbers">>}). + builtin_stats(_, []) -> {[{sum,0}, {count,0}, {min,0}, {max,0}, {sumsqr,0}]}; builtin_stats(reduce, [[_,First]|Rest]) when is_number(First) -> Stats = lists:foldl(fun([_K,V], {S,C,Mi,Ma,Sq}) when is_number(V) -> - {S+V, C+1, erlang:min(Mi,V), erlang:max(Ma,V), Sq+(V*V)}; + {S+V, C+1, lists:min([Mi, V]), lists:max([Ma, V]), Sq+(V*V)}; (_, _) -> throw({invalid_value, <<"builtin _stats function requires map values to be numbers">>}) @@ -160,7 +182,7 @@ builtin_stats(rereduce, [[_,First]|Rest]) -> {[{sum,Sum0}, {count,Cnt0}, {min,Min0}, {max,Max0}, {sumsqr,Sqr0}]} = First, Stats = lists:foldl(fun([_K,Red], {S,C,Mi,Ma,Sq}) -> {[{sum,Sum}, {count,Cnt}, {min,Min}, {max,Max}, {sumsqr,Sqr}]} = Red, - {Sum+S, Cnt+C, erlang:min(Min,Mi), erlang:max(Max,Ma), Sqr+Sq} + {Sum+S, Cnt+C, lists:min([Min, Mi]), lists:max([Max, Ma]), Sqr+Sq} end, {Sum0,Cnt0,Min0,Max0,Sqr0}, Rest), {Sum, Cnt, Min, Max, Sqr} = Stats, {[{sum,Sum}, {count,Cnt}, {min,Min}, {max,Max}, {sumsqr,Sqr}]}. diff --git a/apps/couch/src/couch_ref_counter.erl b/apps/couch/src/couch_ref_counter.erl index 5a111ab6..a774f469 100644 --- a/apps/couch/src/couch_ref_counter.erl +++ b/apps/couch/src/couch_ref_counter.erl @@ -24,14 +24,14 @@ drop(RefCounterPid) -> drop(RefCounterPid, self()). drop(RefCounterPid, Pid) -> - gen_server:call(RefCounterPid, {drop, Pid}). + gen_server:call(RefCounterPid, {drop, Pid}, infinity). add(RefCounterPid) -> add(RefCounterPid, self()). add(RefCounterPid, Pid) -> - gen_server:call(RefCounterPid, {add, Pid}). + gen_server:call(RefCounterPid, {add, Pid}, infinity). count(RefCounterPid) -> gen_server:call(RefCounterPid, count). diff --git a/apps/couch/src/couch_rep.erl b/apps/couch/src/couch_rep.erl index c804b49d..2d011aab 100644 --- a/apps/couch/src/couch_rep.erl +++ b/apps/couch/src/couch_rep.erl @@ -15,9 +15,14 @@ -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). --export([replicate/2, checkpoint/1]). +-export([replicate/2, replicate/3, checkpoint/1]). +-export([make_replication_id/2]). +-export([start_replication/4, end_replication/1, get_result/4]). -include("couch_db.hrl"). +-include_lib("ibrowse/include/ibrowse.hrl"). + +-define(REP_ID_VERSION, 2). -record(state, { changes_feed, @@ -47,7 +52,6 @@ committed_seq = 0, stats = nil, - doc_ids = nil, source_db_update_notifier = nil, target_db_update_notifier = nil }). @@ -61,40 +65,55 @@ replicate(Source, Target) when is_binary(Source), is_binary(Target) -> replicate({[{<<"source">>, Source}, {<<"target">>, Target}]}, #user_ctx{}); %% function handling POST to _replicate -replicate({Props}=PostBody, UserCtx) -> - {BaseId, Extension} = make_replication_id(PostBody, UserCtx), - Replicator = {BaseId ++ Extension, - {gen_server, start_link, [?MODULE, [BaseId, PostBody, UserCtx], []]}, - temporary, - 1, - worker, - [?MODULE] - }, +replicate(PostBody, UserCtx) -> + replicate(PostBody, UserCtx, couch_replication_manager). +replicate({Props}=PostBody, UserCtx, Module) -> + RepId = make_replication_id(PostBody, UserCtx), case couch_util:get_value(<<"cancel">>, Props, false) of true -> - case supervisor:terminate_child(couch_rep_sup, BaseId ++ Extension) of - {error, not_found} -> - {error, not_found}; - ok -> - ok = supervisor:delete_child(couch_rep_sup, BaseId ++ Extension), - {ok, {cancelled, ?l2b(BaseId)}} - end; + end_replication(RepId); false -> - Server = start_replication_server(Replicator), + Server = start_replication(PostBody, RepId, UserCtx, Module), + get_result(Server, RepId, PostBody, UserCtx) + end. - case couch_util:get_value(<<"continuous">>, Props, false) of - true -> - {ok, {continuous, ?l2b(BaseId)}}; - false -> - get_result(Server, PostBody, UserCtx) +end_replication({BaseId, Extension}) -> + RepId = BaseId ++ Extension, + case supervisor:terminate_child(couch_rep_sup, RepId) of + {error, not_found} = R -> + R; + ok -> + case supervisor:delete_child(couch_rep_sup, RepId) of + ok -> + {ok, {cancelled, ?l2b(BaseId)}}; + {error, not_found} -> + {ok, {cancelled, ?l2b(BaseId)}}; + {error, _} = Error -> + Error end end. +start_replication(RepDoc, {BaseId, Extension} = RepId, UserCtx, Module) -> + Replicator = { + BaseId ++ Extension, + {gen_server, start_link, + [?MODULE, [RepId, RepDoc, UserCtx, Module], []]}, + temporary, + 1, + worker, + [?MODULE] + }, + start_replication_server(Replicator). + checkpoint(Server) -> gen_server:cast(Server, do_checkpoint). -get_result(Server, PostBody, UserCtx) -> +get_result(Server, {BaseId, _Extension}, {Props} = PostBody, UserCtx) -> + case couch_util:get_value(<<"continuous">>, Props, false) of + true -> + {ok, {continuous, ?l2b(BaseId)}}; + false -> try gen_server:call(Server, get_result, infinity) of retry -> replicate(PostBody, UserCtx); Else -> Else @@ -105,6 +124,7 @@ get_result(Server, PostBody, UserCtx) -> exit:{normal, {gen_server, call, [Server, get_result , infinity]}} -> %% we made the call during terminate replicate(PostBody, UserCtx) + end end. init(InitArgs) -> @@ -115,13 +135,12 @@ init(InitArgs) -> {stop, Error} end. -do_init([RepId, {PostProps}, UserCtx] = InitArgs) -> +do_init([{BaseId, _Ext} = RepId, {PostProps}, UserCtx, Module] = InitArgs) -> process_flag(trap_exit, true), SourceProps = couch_util:get_value(<<"source">>, PostProps), TargetProps = couch_util:get_value(<<"target">>, PostProps), - DocIds = couch_util:get_value(<<"doc_ids">>, PostProps, nil), Continuous = couch_util:get_value(<<"continuous">>, PostProps, false), CreateTarget = couch_util:get_value(<<"create_target">>, PostProps, false), @@ -133,29 +152,8 @@ do_init([RepId, {PostProps}, UserCtx] = InitArgs) -> SourceInfo = dbinfo(Source), TargetInfo = dbinfo(Target), - case DocIds of - List when is_list(List) -> - % Fast replication using only a list of doc IDs to replicate. - % Replication sessions, checkpoints and logs are not created - % since the update sequence number of the source DB is not used - % for determining which documents are copied into the target DB. - SourceLog = nil, - TargetLog = nil, - - StartSeq = nil, - History = nil, - - ChangesFeed = nil, - MissingRevs = nil, - - {ok, Reader} = - couch_rep_reader:start_link(self(), Source, DocIds, PostProps); - - _ -> - % Replication using the _changes API (DB sequence update numbers). - SourceLog = open_replication_log(Source, RepId), - TargetLog = open_replication_log(Target, RepId), - + [SourceLog, TargetLog] = find_replication_logs( + [Source, Target], BaseId, {PostProps}, UserCtx), {StartSeq, History} = compare_replication_logs(SourceLog, TargetLog), {ok, ChangesFeed} = @@ -163,9 +161,7 @@ do_init([RepId, {PostProps}, UserCtx] = InitArgs) -> {ok, MissingRevs} = couch_rep_missing_revs:start_link(self(), Target, ChangesFeed, PostProps), {ok, Reader} = - couch_rep_reader:start_link(self(), Source, MissingRevs, PostProps) - end, - + couch_rep_reader:start_link(self(), Source, MissingRevs, PostProps), {ok, Writer} = couch_rep_writer:start_link(self(), Target, Reader, PostProps), @@ -176,10 +172,12 @@ do_init([RepId, {PostProps}, UserCtx] = InitArgs) -> ets:insert(Stats, {docs_written, 0}), ets:insert(Stats, {doc_write_failures, 0}), - {ShortId, _} = lists:split(6, RepId), + {ShortId, _} = lists:split(6, BaseId), couch_task_status:add_task("Replication", io_lib:format("~s: ~s -> ~s", [ShortId, dbname(Source), dbname(Target)]), "Starting"), + Module:replication_started(RepId), + State = #state{ changes_feed = ChangesFeed, missing_revs = MissingRevs, @@ -202,7 +200,6 @@ do_init([RepId, {PostProps}, UserCtx] = InitArgs) -> rep_starttime = httpd_util:rfc1123_date(), src_starttime = couch_util:get_value(instance_start_time, SourceInfo), tgt_starttime = couch_util:get_value(instance_start_time, TargetInfo), - doc_ids = DocIds, source_db_update_notifier = source_db_update_notifier(Source), target_db_update_notifier = target_db_update_notifier(Target) }, @@ -274,24 +271,24 @@ handle_info({'EXIT', _Pid, {Err, Reason}}, State) when Err == source_error; handle_info({'EXIT', _Pid, Reason}, State) -> {stop, Reason, State}. -terminate(normal, #state{checkpoint_scheduled=nil} = State) -> - do_terminate(State); +terminate(normal, #state{checkpoint_scheduled=nil, init_args=[RepId, _, _, Module]} = State) -> + do_terminate(State), + Module:replication_completed(RepId); -terminate(normal, State) -> +terminate(normal, #state{init_args=[RepId, _, _, Module]} = State) -> timer:cancel(State#state.checkpoint_scheduled), - do_terminate(do_checkpoint(State)); + do_terminate(do_checkpoint(State)), + Module:replication_completed(RepId); -terminate(Reason, State) -> - #state{ - listeners = Listeners, - source = Source, - target = Target, - stats = Stats - } = State, +terminate(shutdown, #state{listeners = Listeners} = State) -> + % continuous replication stopped + [gen_server:reply(L, {ok, stopped}) || L <- Listeners], + terminate_cleanup(State); + +terminate(Reason, #state{listeners = Listeners, init_args=[RepId, _, _, Module]} = State) -> [gen_server:reply(L, {error, Reason}) || L <- Listeners], - ets:delete(Stats), - close_db(Target), - close_db(Source). + terminate_cleanup(State), + Module:replication_error(RepId, Reason). code_change(_OldVsn, State, _Extra) -> {ok, State}. @@ -321,7 +318,14 @@ start_replication_server(Replicator) -> throw({db_not_found, <<"could not open ", DbUrl/binary>>}); {error, {unauthorized, DbUrl}} -> throw({unauthorized, - <<"unauthorized to access database ", DbUrl/binary>>}) + <<"unauthorized to access or create database ", DbUrl/binary>>}); + {error, {'EXIT', {badarg, + [{erlang, apply, [gen_server, start_link, undefined]} | _]}}} -> + % Clause to deal with a change in the supervisor module introduced + % in R14B02. For more details consult the thread at: + % http://erlang.org/pipermail/erlang-bugs/2011-March/002273.html + _ = supervisor:delete_child(couch_rep_sup, RepId), + start_replication_server(Replicator) end; {error, {already_started, Pid}} -> ?LOG_DEBUG("replication ~p already running at ~p", [RepId, Pid]), @@ -330,7 +334,7 @@ start_replication_server(Replicator) -> throw({db_not_found, <<"could not open ", DbUrl/binary>>}); {error, {{unauthorized, DbUrl}, _}} -> throw({unauthorized, - <<"unauthorized to access database ", DbUrl/binary>>}) + <<"unauthorized to access or create database ", DbUrl/binary>>}) end. compare_replication_logs(SrcDoc, TgtDoc) -> @@ -390,30 +394,11 @@ dbname(#db{name = Name}) -> dbinfo(#http_db{} = Db) -> {DbProps} = couch_rep_httpc:request(Db), - [{list_to_existing_atom(?b2l(K)), V} || {K,V} <- DbProps]; + [{couch_util:to_existing_atom(K), V} || {K,V} <- DbProps]; dbinfo(Db) -> {ok, Info} = couch_db:get_db_info(Db), Info. -do_terminate(#state{doc_ids=DocIds} = State) when is_list(DocIds) -> - #state{ - listeners = Listeners, - rep_starttime = ReplicationStartTime, - stats = Stats - } = State, - - RepByDocsJson = {[ - {<<"start_time">>, ?l2b(ReplicationStartTime)}, - {<<"end_time">>, ?l2b(httpd_util:rfc1123_date())}, - {<<"docs_read">>, ets:lookup_element(Stats, docs_read, 2)}, - {<<"docs_written">>, ets:lookup_element(Stats, docs_written, 2)}, - {<<"doc_write_failures">>, - ets:lookup_element(Stats, doc_write_failures, 2)} - ]}, - - terminate_cleanup(State), - [gen_server:reply(L, {ok, RepByDocsJson}) || L <- lists:reverse(Listeners)]; - do_terminate(State) -> #state{ checkpoint_history = CheckpointHistory, @@ -475,7 +460,7 @@ has_session_id(SessionId, [{Props} | Rest]) -> has_session_id(SessionId, Rest) end. -maybe_append_options(Options, Props) -> +maybe_append_options(Options, {Props}) -> lists:foldl(fun(Option, Acc) -> Acc ++ case couch_util:get_value(Option, Props, false) of @@ -486,13 +471,39 @@ maybe_append_options(Options, Props) -> end end, [], Options). -make_replication_id({Props}, UserCtx) -> - %% funky algorithm to preserve backwards compatibility +make_replication_id(RepProps, UserCtx) -> + BaseId = make_replication_id(RepProps, UserCtx, ?REP_ID_VERSION), + Extension = maybe_append_options( + [<<"continuous">>, <<"create_target">>], RepProps), + {BaseId, Extension}. + +% Versioned clauses for generating replication ids +% If a change is made to how replications are identified +% add a new clause and increase ?REP_ID_VERSION at the top +make_replication_id({Props}, UserCtx, 2) -> + {ok, HostName} = inet:gethostname(), + Port = case (catch mochiweb_socket_server:get(couch_httpd, port)) of + P when is_number(P) -> + P; + _ -> + % On restart we might be called before the couch_httpd process is + % started. + % TODO: we might be under an SSL socket server only, or both under + % SSL and a non-SSL socket. + % ... mochiweb_socket_server:get(https, port) + list_to_integer(couch_config:get("httpd", "port", "5984")) + end, + Src = get_rep_endpoint(UserCtx, couch_util:get_value(<<"source">>, Props)), + Tgt = get_rep_endpoint(UserCtx, couch_util:get_value(<<"target">>, Props)), + maybe_append_filters({Props}, [HostName, Port, Src, Tgt], UserCtx); +make_replication_id({Props}, UserCtx, 1) -> {ok, HostName} = inet:gethostname(), - % Port = mochiweb_socket_server:get(couch_httpd, port), Src = get_rep_endpoint(UserCtx, couch_util:get_value(<<"source">>, Props)), Tgt = get_rep_endpoint(UserCtx, couch_util:get_value(<<"target">>, Props)), - Base = [HostName, Src, Tgt] ++ + maybe_append_filters({Props}, [HostName, Src, Tgt], UserCtx). + +maybe_append_filters({Props}, Base, UserCtx) -> + Base2 = Base ++ case couch_util:get_value(<<"filter">>, Props) of undefined -> case couch_util:get_value(<<"doc_ids">>, Props) of @@ -502,11 +513,47 @@ make_replication_id({Props}, UserCtx) -> [DocIds] end; Filter -> - [Filter, couch_util:get_value(<<"query_params">>, Props, {[]})] + [filter_code(Filter, Props, UserCtx), + couch_util:get_value(<<"query_params">>, Props, {[]})] end, - Extension = maybe_append_options( - [<<"continuous">>, <<"create_target">>], Props), - {couch_util:to_hex(couch_util:md5(term_to_binary(Base))), Extension}. + couch_util:to_hex(couch_util:md5(term_to_binary(Base2))). + +filter_code(Filter, Props, UserCtx) -> + {DDocName, FilterName} = + case re:run(Filter, "(.*?)/(.*)", [{capture, [1, 2], binary}]) of + {match, [DDocName0, FilterName0]} -> + {DDocName0, FilterName0}; + _ -> + throw({error, <<"Invalid filter. Must match `ddocname/filtername`.">>}) + end, + ProxyParams = parse_proxy_params( + couch_util:get_value(<<"proxy">>, Props, [])), + DbName = couch_util:get_value(<<"source">>, Props), + Source = try + open_db(DbName, UserCtx, ProxyParams) + catch + _Tag:DbError -> + DbErrorMsg = io_lib:format("Could not open source database `~s`: ~s", + [couch_util:url_strip_password(DbName), couch_util:to_binary(DbError)]), + throw({error, iolist_to_binary(DbErrorMsg)}) + end, + try + Body = case (catch open_doc(Source, <<"_design/", DDocName/binary>>)) of + {ok, #doc{body = Body0}} -> + Body0; + DocError -> + DocErrorMsg = io_lib:format( + "Couldn't open document `_design/~s` from source " + "database `~s`: ~s", + [dbname(Source), DDocName, couch_util:to_binary(DocError)]), + throw({error, iolist_to_binary(DocErrorMsg)}) + end, + Code = couch_util:get_nested_json_value( + Body, [<<"filters">>, FilterName]), + re:replace(Code, "^\s*(.*?)\s*$", "\\1", [{return, binary}]) + after + close_db(Source) + end. maybe_add_trailing_slash(Url) -> re:replace(Url, "[^/]$", "&/", [{return, list}]). @@ -528,27 +575,54 @@ get_rep_endpoint(_UserCtx, <<"https://",_/binary>>=Url) -> get_rep_endpoint(UserCtx, <<DbName/binary>>) -> {local, DbName, UserCtx}. -open_replication_log(#http_db{}=Db, RepId) -> - DocId = ?LOCAL_DOC_PREFIX ++ RepId, - Req = Db#http_db{resource=couch_util:url_encode(DocId)}, +find_replication_logs(DbList, RepId, RepProps, UserCtx) -> + LogId = ?l2b(?LOCAL_DOC_PREFIX ++ RepId), + fold_replication_logs(DbList, ?REP_ID_VERSION, + LogId, LogId, RepProps, UserCtx, []). + +% Accumulate the replication logs +% Falls back to older log document ids and migrates them +fold_replication_logs([], _Vsn, _LogId, _NewId, _RepProps, _UserCtx, Acc) -> + lists:reverse(Acc); +fold_replication_logs([Db|Rest]=Dbs, Vsn, LogId, NewId, + RepProps, UserCtx, Acc) -> + case open_replication_log(Db, LogId) of + {error, not_found} when Vsn > 1 -> + OldRepId = make_replication_id(RepProps, UserCtx, Vsn - 1), + fold_replication_logs(Dbs, Vsn - 1, + ?l2b(?LOCAL_DOC_PREFIX ++ OldRepId), NewId, RepProps, UserCtx, Acc); + {error, not_found} -> + fold_replication_logs(Rest, ?REP_ID_VERSION, NewId, NewId, + RepProps, UserCtx, [#doc{id=NewId}|Acc]); + {ok, Doc} when LogId =:= NewId -> + fold_replication_logs(Rest, ?REP_ID_VERSION, NewId, NewId, + RepProps, UserCtx, [Doc|Acc]); + {ok, Doc} -> + MigratedLog = #doc{id=NewId,body=Doc#doc.body}, + fold_replication_logs(Rest, ?REP_ID_VERSION, NewId, NewId, + RepProps, UserCtx, [MigratedLog|Acc]) + end. + +open_replication_log(Db, DocId) -> + case open_doc(Db, DocId) of + {ok, Doc} -> + ?LOG_DEBUG("found a replication log for ~s", [dbname(Db)]), + {ok, Doc}; + _ -> + ?LOG_DEBUG("didn't find a replication log for ~s", [dbname(Db)]), + {error, not_found} + end. + +open_doc(#http_db{} = Db, DocId) -> + Req = Db#http_db{resource = couch_util:encode_doc_id(DocId)}, case couch_rep_httpc:request(Req) of {[{<<"error">>, _}, {<<"reason">>, _}]} -> - ?LOG_DEBUG("didn't find a replication log for ~s", [Db#http_db.url]), - #doc{id=?l2b(DocId)}; + {error, not_found}; Doc -> - ?LOG_DEBUG("found a replication log for ~s", [Db#http_db.url]), - couch_doc:from_json_obj(Doc) + {ok, couch_doc:from_json_obj(Doc)} end; -open_replication_log(Db, RepId) -> - DocId = ?l2b(?LOCAL_DOC_PREFIX ++ RepId), - case couch_db:open_doc(Db, DocId, []) of - {ok, Doc} -> - ?LOG_DEBUG("found a replication log for ~s", [Db#db.name]), - Doc; - _ -> - ?LOG_DEBUG("didn't find a replication log for ~s", [Db#db.name]), - #doc{id=DocId} - end. +open_doc(Db, DocId) -> + couch_db:open_doc(Db, DocId). open_db(Props, UserCtx, ProxyParams) -> open_db(Props, UserCtx, ProxyParams, false). @@ -575,18 +649,18 @@ open_db(<<"https://",_/binary>>=Url, _, ProxyParams, CreateTarget) -> open_db({[{<<"url">>,Url}]}, [], ProxyParams, CreateTarget); open_db(<<DbName/binary>>, UserCtx, _ProxyParams, CreateTarget) -> try - case CreateTarget of - true -> - ok = couch_httpd:verify_is_server_admin(UserCtx), - couch_server:create(DbName, [{user_ctx, UserCtx}]); + case CreateTarget of + true -> + ok = couch_httpd:verify_is_server_admin(UserCtx), + couch_server:create(DbName, [{user_ctx, UserCtx}]); false -> ok - end, + end, - case couch_db:open(DbName, [{user_ctx, UserCtx}]) of - {ok, Db} -> - couch_db:monitor(Db), - Db; + case couch_db:open(DbName, [{user_ctx, UserCtx}]) of + {ok, Db} -> + couch_db:monitor(Db), + Db; {not_found, no_db_file} -> throw({db_not_found, DbName}) end @@ -619,32 +693,54 @@ do_checkpoint(State) -> rep_starttime = ReplicationStartTime, src_starttime = SrcInstanceStartTime, tgt_starttime = TgtInstanceStartTime, - stats = Stats + stats = Stats, + init_args = [_RepId, {RepDoc} | _] } = State, case commit_to_both(Source, Target, NewSeqNum) of {SrcInstanceStartTime, TgtInstanceStartTime} -> ?LOG_INFO("recording a checkpoint for ~s -> ~s at source update_seq ~p", [dbname(Source), dbname(Target), NewSeqNum]), + EndTime = ?l2b(httpd_util:rfc1123_date()), + StartTime = ?l2b(ReplicationStartTime), + DocsRead = ets:lookup_element(Stats, docs_read, 2), + DocsWritten = ets:lookup_element(Stats, docs_written, 2), + DocWriteFailures = ets:lookup_element(Stats, doc_write_failures, 2), NewHistoryEntry = {[ {<<"session_id">>, SessionId}, - {<<"start_time">>, list_to_binary(ReplicationStartTime)}, - {<<"end_time">>, list_to_binary(httpd_util:rfc1123_date())}, + {<<"start_time">>, StartTime}, + {<<"end_time">>, EndTime}, {<<"start_last_seq">>, StartSeqNum}, {<<"end_last_seq">>, NewSeqNum}, {<<"recorded_seq">>, NewSeqNum}, {<<"missing_checked">>, ets:lookup_element(Stats, total_revs, 2)}, {<<"missing_found">>, ets:lookup_element(Stats, missing_revs, 2)}, - {<<"docs_read">>, ets:lookup_element(Stats, docs_read, 2)}, - {<<"docs_written">>, ets:lookup_element(Stats, docs_written, 2)}, - {<<"doc_write_failures">>, - ets:lookup_element(Stats, doc_write_failures, 2)} + {<<"docs_read">>, DocsRead}, + {<<"docs_written">>, DocsWritten}, + {<<"doc_write_failures">>, DocWriteFailures} ]}, - % limit history to 50 entries - NewRepHistory = {[ + BaseHistory = [ {<<"session_id">>, SessionId}, {<<"source_last_seq">>, NewSeqNum}, - {<<"history">>, lists:sublist([NewHistoryEntry | OldHistory], 50)} - ]}, + {<<"replication_id_version">>, ?REP_ID_VERSION} + ] ++ case couch_util:get_value(<<"doc_ids">>, RepDoc) of + undefined -> + []; + DocIds when is_list(DocIds) -> + % backwards compatibility with the result of a replication by + % doc IDs in versions 0.11.x and 1.0.x + [ + {<<"start_time">>, StartTime}, + {<<"end_time">>, EndTime}, + {<<"docs_read">>, DocsRead}, + {<<"docs_written">>, DocsWritten}, + {<<"doc_write_failures">>, DocWriteFailures} + ] + end, + % limit history to 50 entries + NewRepHistory = { + BaseHistory ++ + [{<<"history">>, lists:sublist([NewHistoryEntry | OldHistory], 50)}] + }, try {SrcRevPos,SrcRevId} = @@ -760,9 +856,9 @@ ensure_full_commit(Source, RequiredSeq) -> InstanceStartTime end. -update_local_doc(#http_db{} = Db, #doc{id=DocId} = Doc) -> +update_local_doc(#http_db{} = Db, Doc) -> Req = Db#http_db{ - resource = couch_util:url_encode(DocId), + resource = couch_util:encode_doc_id(Doc), method = put, body = couch_doc:to_json_obj(Doc, [attachments]), headers = [{"x-couch-full-commit", "false"} | Db#http_db.headers] @@ -787,9 +883,13 @@ parse_proxy_params(ProxyUrl) when is_binary(ProxyUrl) -> parse_proxy_params([]) -> []; parse_proxy_params(ProxyUrl) -> - {url, _, Base, Port, User, Passwd, _Path, _Proto} = - ibrowse_lib:parse_url(ProxyUrl), - [{proxy_host, Base}, {proxy_port, Port}] ++ + #url{ + host = Host, + port = Port, + username = User, + password = Passwd + } = ibrowse_lib:parse_url(ProxyUrl), + [{proxy_host, Host}, {proxy_port, Port}] ++ case is_list(User) andalso is_list(Passwd) of false -> []; diff --git a/apps/couch/src/couch_rep_att.erl b/apps/couch/src/couch_rep_att.erl index 72c723e8..9988c5db 100644 --- a/apps/couch/src/couch_rep_att.erl +++ b/apps/couch/src/couch_rep_att.erl @@ -105,8 +105,7 @@ validate_headers(_Req, 200, Headers) -> MochiHeaders = mochiweb_headers:make(Headers), {ok, mochiweb_headers:get_value("Content-Encoding", MochiHeaders)}; validate_headers(Req, Code, Headers) when Code > 299, Code < 400 -> - Url = couch_rep_httpc:redirect_url(Headers, Req#http_db.url), - NewReq = couch_rep_httpc:redirected_request(Req, Url), + NewReq = couch_rep_httpc:redirected_request(Code, Headers, Req), {ibrowse_req_id, ReqId} = couch_rep_httpc:request(NewReq), receive {ibrowse_async_headers, ReqId, NewCode, NewHeaders} -> {ok, Encoding} = validate_headers(NewReq, list_to_integer(NewCode), diff --git a/apps/couch/src/couch_rep_changes_feed.erl b/apps/couch/src/couch_rep_changes_feed.erl index 032f62a3..36fe82aa 100644 --- a/apps/couch/src/couch_rep_changes_feed.erl +++ b/apps/couch/src/couch_rep_changes_feed.erl @@ -18,6 +18,7 @@ -export([start_link/4, next/1, stop/1]). -define(BUFFER_SIZE, 1000). +-define(DOC_IDS_FILTER_NAME, "_doc_ids"). -include("couch_db.hrl"). -include_lib("ibrowse/include/ibrowse.hrl"). @@ -33,9 +34,15 @@ count = 0, partial_chunk = <<>>, reply_to = nil, - rows = queue:new() + rows = queue:new(), + doc_ids = nil }). +-import(couch_util, [ + get_value/2, + get_value/3 +]). + start_link(Parent, Source, StartSeq, PostProps) -> gen_server:start_link(?MODULE, [Parent, Source, StartSeq, PostProps], []). @@ -46,9 +53,9 @@ stop(Server) -> catch gen_server:call(Server, stop), ok. -init([Parent, #http_db{}=Source, Since, PostProps]) -> +init([Parent, #http_db{headers = Headers0} = Source, Since, PostProps]) -> process_flag(trap_exit, true), - Feed = case couch_util:get_value(<<"continuous">>, PostProps, false) of + Feed = case get_value(<<"continuous">>, PostProps, false) of false -> normal; true -> @@ -60,51 +67,59 @@ init([Parent, #http_db{}=Source, Since, PostProps]) -> {"since", Since}, {"feed", Feed} ], - QS = case couch_util:get_value(<<"filter">>, PostProps) of + {QS, Method, Body, Headers} = case get_value(<<"doc_ids">>, PostProps) of undefined -> - BaseQS; - FilterName -> - {Params} = couch_util:get_value(<<"query_params">>, PostProps, {[]}), - lists:foldr( - fun({K, V}, QSAcc) -> - Ks = couch_util:to_list(K), - case proplists:is_defined(Ks, QSAcc) of - true -> - QSAcc; - false -> - [{Ks, V} | QSAcc] - end - end, - [{"filter", FilterName} | BaseQS], - Params - ) + {maybe_add_filter_qs_params(PostProps, BaseQS), get, nil, Headers0}; + DocIds when is_list(DocIds) -> + Headers1 = [{"Content-Type", "application/json"} | Headers0], + QS1 = [{"filter", ?l2b(?DOC_IDS_FILTER_NAME)} | BaseQS], + {QS1, post, {[{<<"doc_ids">>, DocIds}]}, Headers1} end, Pid = couch_rep_httpc:spawn_link_worker_process(Source), Req = Source#http_db{ + method = Method, + body = Body, resource = "_changes", qs = QS, conn = Pid, options = [{stream_to, {self(), once}}] ++ lists:keydelete(inactivity_timeout, 1, Source#http_db.options), - headers = Source#http_db.headers -- [{"Accept-Encoding", "gzip"}] + headers = Headers -- [{"Accept-Encoding", "gzip"}] }, {ibrowse_req_id, ReqId} = couch_rep_httpc:request(Req), Args = [Parent, Req, Since, PostProps], + State = #state{ + conn = Pid, + last_seq = Since, + reqid = ReqId, + init_args = Args, + doc_ids = get_value(<<"doc_ids">>, PostProps, nil) + }, receive {ibrowse_async_headers, ReqId, "200", _} -> ibrowse:stream_next(ReqId), - {ok, #state{conn=Pid, last_seq=Since, reqid=ReqId, init_args=Args}}; - {ibrowse_async_headers, ReqId, Code, Hdrs} when Code=="301"; Code=="302" -> - stop_link_worker(Pid), - Url2 = couch_rep_httpc:redirect_url(Hdrs, Req#http_db.url), - Req2 = couch_rep_httpc:redirected_request(Req, Url2), - Pid2 = couch_rep_httpc:spawn_link_worker_process(Req2), - Req3 = Req2#http_db{conn = Pid2}, - {ibrowse_req_id, ReqId2} = couch_rep_httpc:request(Req3), - Args2 = [Parent, Req3, Since, PostProps], - receive {ibrowse_async_headers, ReqId2, "200", _} -> - {ok, #state{conn=Pid2, last_seq=Since, reqid=ReqId2, init_args=Args2}} + {ok, State}; + {ibrowse_async_headers, ReqId, Code, Hdrs} + when Code =:= "301"; Code =:= "302"; Code =:= "303" -> + {ReqId2, Req2} = redirect_req(Req, Code, Hdrs), + receive + {ibrowse_async_headers, ReqId2, "200", _} -> + {ok, State#state{ + conn = Req2#http_db.conn, + reqid = ReqId2, + init_args = [Parent, Req2, Since, PostProps]}}; + {ibrowse_async_headers, ReqId2, "405", _} when Method =:= post -> + {ReqId3, Req3} = req_no_builtin_doc_ids(Req2, ReqId2), + receive + {ibrowse_async_headers, ReqId3, "200", _} -> + {ok, State#state{ + conn = Req3#http_db.conn, + reqid = ReqId3, + init_args = [Parent, Req3, Since, PostProps]}} + after 30000 -> + {stop, changes_timeout} + end after 30000 -> {stop, changes_timeout} end; @@ -113,7 +128,30 @@ init([Parent, #http_db{}=Source, Since, PostProps]) -> ?LOG_INFO("source doesn't have _changes, trying _all_docs_by_seq", []), Self = self(), BySeqPid = spawn_link(fun() -> by_seq_loop(Self, Source, Since) end), - {ok, #state{last_seq=Since, changes_loop=BySeqPid, init_args=Args}}; + {ok, State#state{changes_loop = BySeqPid}}; + {ibrowse_async_headers, ReqId, "405", _} when Method =:= post -> + {ReqId2, Req2} = req_no_builtin_doc_ids(Req, ReqId), + receive + {ibrowse_async_headers, ReqId2, "200", _} -> + {ok, State#state{ + conn = Req2#http_db.conn, + reqid = ReqId2, + init_args = [Parent, Req2, Since, PostProps]}}; + {ibrowse_async_headers, ReqId, Code, Hdrs} + when Code =:= "301"; Code =:= "302"; Code =:= "303" -> + {ReqId3, Req3} = redirect_req(Req2, Code, Hdrs), + receive + {ibrowse_async_headers, ReqId3, "200", _} -> + {ok, State#state{ + conn = Req3#http_db.conn, + reqid = ReqId3, + init_args = [Parent, Req3, Since, PostProps]}} + after 30000 -> + {stop, changes_timeout} + end + after 30000 -> + {stop, changes_timeout} + end; {ibrowse_async_headers, ReqId, Code, _} -> {stop, {changes_error_code, list_to_integer(Code)}} after 10000 -> @@ -123,11 +161,17 @@ init([Parent, #http_db{}=Source, Since, PostProps]) -> init([_Parent, Source, Since, PostProps] = InitArgs) -> process_flag(trap_exit, true), Server = self(), + Filter = case get_value(<<"doc_ids">>, PostProps) of + undefined -> + ?b2l(get_value(<<"filter">>, PostProps, <<>>)); + DocIds when is_list(DocIds) -> + ?DOC_IDS_FILTER_NAME + end, ChangesArgs = #changes_args{ style = all_docs, since = Since, - filter = ?b2l(couch_util:get_value(<<"filter">>, PostProps, <<>>)), - feed = case couch_util:get_value(<<"continuous">>, PostProps, false) of + filter = Filter, + feed = case get_value(<<"continuous">>, PostProps, false) of true -> "continuous"; false -> @@ -138,7 +182,7 @@ init([_Parent, Source, Since, PostProps] = InitArgs) -> ChangesPid = spawn_link(fun() -> ChangesFeedFun = couch_changes:handle_changes( ChangesArgs, - {json_req, filter_json_req(Source, PostProps)}, + {json_req, filter_json_req(Filter, Source, PostProps)}, Source ), ChangesFeedFun(fun({change, Change, _}, _) -> @@ -149,29 +193,49 @@ init([_Parent, Source, Since, PostProps] = InitArgs) -> end), {ok, #state{changes_loop=ChangesPid, init_args=InitArgs}}. -filter_json_req(Db, PostProps) -> - case couch_util:get_value(<<"filter">>, PostProps) of +maybe_add_filter_qs_params(PostProps, BaseQS) -> + case get_value(<<"filter">>, PostProps) of undefined -> - {[]}; + BaseQS; FilterName -> - {Query} = couch_util:get_value(<<"query_params">>, PostProps, {[]}), - {ok, Info} = couch_db:get_db_info(Db), - % simulate a request to db_name/_changes - {[ - {<<"info">>, {Info}}, - {<<"id">>, null}, - {<<"method">>, 'GET'}, - {<<"path">>, [couch_db:name(Db), <<"_changes">>]}, - {<<"query">>, {[{<<"filter">>, FilterName} | Query]}}, - {<<"headers">>, []}, - {<<"body">>, []}, - {<<"peer">>, <<"replicator">>}, - {<<"form">>, []}, - {<<"cookie">>, []}, - {<<"userCtx">>, couch_util:json_user_ctx(Db)} - ]} + {Params} = get_value(<<"query_params">>, PostProps, {[]}), + lists:foldr( + fun({K, V}, QSAcc) -> + Ks = couch_util:to_list(K), + case proplists:is_defined(Ks, QSAcc) of + true -> + QSAcc; + false -> + [{Ks, V} | QSAcc] + end + end, + [{"filter", FilterName} | BaseQS], + Params + ) end. +filter_json_req([], _Db, _PostProps) -> + {[]}; +filter_json_req(?DOC_IDS_FILTER_NAME, _Db, PostProps) -> + {[{<<"doc_ids">>, get_value(<<"doc_ids">>, PostProps)}]}; +filter_json_req(FilterName, Db, PostProps) -> + {Query} = get_value(<<"query_params">>, PostProps, {[]}), + {ok, Info} = couch_db:get_db_info(Db), + % simulate a request to db_name/_changes + {[ + {<<"info">>, {Info}}, + {<<"id">>, null}, + {<<"method">>, 'GET'}, + {<<"path">>, [couch_db:name(Db), <<"_changes">>]}, + {<<"query">>, {[{<<"filter">>, FilterName} | Query]}}, + {<<"headers">>, []}, + {<<"body">>, []}, + {<<"peer">>, <<"replicator">>}, + {<<"form">>, []}, + {<<"cookie">>, []}, + {<<"userCtx">>, couch_util:json_user_ctx(Db)} + ]}. + handle_call({add_change, Row}, From, State) -> handle_add_change(Row, From, State); @@ -191,6 +255,10 @@ handle_info({ibrowse_async_response, Id, {error, sel_conn_closed}}, #state{reqid=Id}=State) -> handle_retry(State); +handle_info({ibrowse_async_response, Id, {error, connection_closed}}, + #state{reqid=Id}=State) -> + handle_retry(State); + handle_info({ibrowse_async_response, Id, {error,E}}, #state{reqid=Id}=State) -> {stop, {error, E}, State}; @@ -240,12 +308,9 @@ code_change(_OldVsn, State, _Extra) -> %internal funs handle_add_change(Row, From, #state{reply_to=nil} = State) -> - #state{ - count = Count, - rows = Rows - } = State, - NewState = State#state{count=Count+1, rows=queue:in(Row,Rows)}, - if Count < ?BUFFER_SIZE -> + {Rows2, Count2} = queue_changes_row(Row, State), + NewState = State#state{count = Count2, rows = Rows2}, + if Count2 =< ?BUFFER_SIZE -> {reply, ok, NewState}; true -> {noreply, NewState#state{changes_from=From}} @@ -274,11 +339,10 @@ handle_headers(200, _, State) -> maybe_stream_next(State), {noreply, State}; handle_headers(Code, Hdrs, #state{init_args = InitArgs} = State) - when Code =:= 301 ; Code =:= 302 -> + when Code =:= 301 ; Code =:= 302 ; Code =:= 303 -> stop_link_worker(State#state.conn), - [Parent, #http_db{url = Url1} = Source, Since, PostProps] = InitArgs, - Url = couch_rep_httpc:redirect_url(Hdrs, Url1), - Source2 = couch_rep_httpc:redirected_request(Source, Url), + [Parent, Source, Since, PostProps] = InitArgs, + Source2 = couch_rep_httpc:redirected_request(Code, Hdrs, Source), Pid2 = couch_rep_httpc:spawn_link_worker_process(Source2), Source3 = Source2#http_db{conn = Pid2}, {ibrowse_req_id, ReqId} = couch_rep_httpc:request(Source3), @@ -298,21 +362,17 @@ handle_messages([<<"]">>, <<"\"last_seq\":", _/binary>>], State) -> handle_feed_completion(State); handle_messages([<<"{\"last_seq\":", _/binary>>], State) -> handle_feed_completion(State); -handle_messages([Chunk|Rest], State) -> - #state{ - count = Count, - partial_chunk = Partial, - rows = Rows - } = State, +handle_messages([Chunk|Rest], #state{partial_chunk = Partial} = State) -> NewState = try Row = {Props} = decode_row(<<Partial/binary, Chunk/binary>>), case State of #state{reply_to=nil} -> + {Rows2, Count2} = queue_changes_row(Row, State), State#state{ - count = Count+1, last_seq = couch_util:get_value(<<"seq">>, Props), partial_chunk = <<>>, - rows=queue:in(Row,Rows) + rows = Rows2, + count = Count2 }; #state{count=0, reply_to=From}-> gen_server:reply(From, [Row]), @@ -400,3 +460,44 @@ stop_link_worker(Conn) when is_pid(Conn) -> catch ibrowse:stop_worker_process(Conn); stop_link_worker(_) -> ok. + +redirect_req(#http_db{conn = WorkerPid} = Req, Code, Headers) -> + stop_link_worker(WorkerPid), + Req2 = couch_rep_httpc:redirected_request(Code, Headers, Req), + WorkerPid2 = couch_rep_httpc:spawn_link_worker_process(Req2), + Req3 = Req2#http_db{conn = WorkerPid2}, + {ibrowse_req_id, ReqId} = couch_rep_httpc:request(Req3), + {ReqId, Req3}. + +req_no_builtin_doc_ids(#http_db{conn = WorkerPid, qs = QS} = Req, ReqId) -> + % CouchDB versions prior to 1.1.0 don't have the builtin filter _doc_ids + % and don't allow POSTing to /database/_changes + purge_req_messages(ReqId), + stop_link_worker(WorkerPid), + Req2 = Req#http_db{method = get, qs = lists:keydelete("filter", 1, QS)}, + WorkerPid2 = couch_rep_httpc:spawn_link_worker_process(Req2), + Req3 = Req2#http_db{conn = WorkerPid2}, + {ibrowse_req_id, ReqId2} = couch_rep_httpc:request(Req3), + {ReqId2, Req3}. + +purge_req_messages(ReqId) -> + ibrowse:stream_next(ReqId), + receive + {ibrowse_async_response, ReqId, {error, _}} -> + ok; + {ibrowse_async_response, ReqId, _Data} -> + purge_req_messages(ReqId); + {ibrowse_async_response_end, ReqId} -> + ok + end. + +queue_changes_row(Row, #state{doc_ids = nil, count = Count, rows = Rows}) -> + {queue:in(Row, Rows), Count + 1}; +queue_changes_row({RowProps} = Row, + #state{doc_ids = Ids, count = Count, rows = Rows}) -> + case lists:member(get_value(<<"id">>, RowProps), Ids) of + true -> + {queue:in(Row, Rows), Count + 1}; + false -> + {Rows, Count} + end. diff --git a/apps/couch/src/couch_rep_httpc.erl b/apps/couch/src/couch_rep_httpc.erl index 8153fdcf..e22c8f81 100644 --- a/apps/couch/src/couch_rep_httpc.erl +++ b/apps/couch/src/couch_rep_httpc.erl @@ -14,8 +14,9 @@ -include("couch_db.hrl"). -include_lib("ibrowse/include/ibrowse.hrl"). --export([db_exists/1, db_exists/2, full_url/1, request/1, redirected_request/2, - redirect_url/2, spawn_worker_process/1, spawn_link_worker_process/1]). +-export([db_exists/1, db_exists/2]). +-export([full_url/1, request/1, redirected_request/3]). +-export([spawn_worker_process/1, spawn_link_worker_process/1]). -export([ssl_options/1]). request(#http_db{} = Req) -> @@ -100,6 +101,9 @@ db_exists(Req, CanonicalUrl, CreateDB) -> {ok, "302", RespHeaders, _} -> RedirectUrl = redirect_url(RespHeaders, Req#http_db.url), db_exists(Req#http_db{url = RedirectUrl}, CanonicalUrl); + {ok, "303", RespHeaders, _} -> + RedirectUrl = redirect_url(RespHeaders, Req#http_db.url), + db_exists(Req#http_db{method = get, url = RedirectUrl}, CanonicalUrl); {ok, "401", _, _} -> throw({unauthorized, ?l2b(Url)}); Error -> @@ -123,15 +127,24 @@ config_http(Url) -> redirect_url(RespHeaders, OrigUrl) -> MochiHeaders = mochiweb_headers:make(RespHeaders), RedUrl = mochiweb_headers:get_value("Location", MochiHeaders), - {url, _, Base, Port, _, _, Path, Proto} = ibrowse_lib:parse_url(RedUrl), - {url, _, _, _, User, Passwd, _, _} = ibrowse_lib:parse_url(OrigUrl), + #url{ + host = Host, host_type = HostType, port = Port, + path = Path, protocol = Proto + } = ibrowse_lib:parse_url(RedUrl), + #url{username = User, password = Passwd} = ibrowse_lib:parse_url(OrigUrl), Creds = case is_list(User) andalso is_list(Passwd) of true -> User ++ ":" ++ Passwd ++ "@"; false -> [] end, - atom_to_list(Proto) ++ "://" ++ Creds ++ Base ++ ":" ++ + HostPart = case HostType of + ipv6_address -> + "[" ++ Host ++ "]"; + _ -> + Host + end, + atom_to_list(Proto) ++ "://" ++ Creds ++ HostPart ++ ":" ++ integer_to_list(Port) ++ Path. full_url(#http_db{url=Url} = Req) when is_binary(Url) -> @@ -154,9 +167,8 @@ process_response({ok, Status, Headers, Body}, Req) -> Code = list_to_integer(Status), if Code =:= 200; Code =:= 201 -> ?JSON_DECODE(maybe_decompress(Headers, Body)); - Code =:= 301; Code =:= 302 -> - RedirectUrl = redirect_url(Headers, Req#http_db.url), - do_request(redirected_request(Req, RedirectUrl)); + Code =:= 301; Code =:= 302 ; Code =:= 303 -> + do_request(redirected_request(Code, Headers, Req)); Code =:= 409 -> throw(conflict); Code >= 400, Code < 500 -> @@ -205,16 +217,26 @@ process_response({error, Reason}, Req) -> do_request(Req#http_db{retries = Retries-1, pause = 2*Pause}) end. -redirected_request(Req, RedirectUrl) -> +redirected_request(Code, Headers, Req) -> + RedirectUrl = redirect_url(Headers, Req#http_db.url), {Base, QStr, _} = mochiweb_util:urlsplit_path(RedirectUrl), QS = mochiweb_util:parse_qs(QStr), - Hdrs = case couch_util:get_value(<<"oauth">>, Req#http_db.auth) of + ReqHeaders = case couch_util:get_value(<<"oauth">>, Req#http_db.auth) of undefined -> Req#http_db.headers; _Else -> lists:keydelete("Authorization", 1, Req#http_db.headers) end, - Req#http_db{url=Base, resource="", qs=QS, headers=Hdrs}. + Req#http_db{ + method = case couch_util:to_integer(Code) of + 303 -> get; + _ -> Req#http_db.method + end, + url = Base, + resource = "", + qs = QS, + headers = ReqHeaders + }. spawn_worker_process(Req) -> Url = ibrowse_lib:parse_url(Req#http_db.url), diff --git a/apps/couch/src/couch_rep_reader.erl b/apps/couch/src/couch_rep_reader.erl index a7ae45a8..0d344e5c 100644 --- a/apps/couch/src/couch_rep_reader.erl +++ b/apps/couch/src/couch_rep_reader.erl @@ -17,7 +17,7 @@ -export([start_link/4, next/1]). --import(couch_util, [url_encode/1]). +-import(couch_util, [encode_doc_id/1]). -define (BUFFER_SIZE, 1000). -define (MAX_CONCURRENT_REQUESTS, 100). @@ -40,26 +40,17 @@ opened_seqs = [] }). -start_link(Parent, Source, MissingRevs_or_DocIds, PostProps) -> - gen_server:start_link( - ?MODULE, [Parent, Source, MissingRevs_or_DocIds, PostProps], [] - ). +start_link(Parent, Source, MissingRevs, PostProps) -> + gen_server:start_link(?MODULE, [Parent, Source, MissingRevs, PostProps], []). next(Pid) -> gen_server:call(Pid, next_docs, infinity). -init([Parent, Source, MissingRevs_or_DocIds, _PostProps]) -> +init([Parent, Source, MissingRevs, _PostProps]) -> process_flag(trap_exit, true), Self = self(), ReaderLoop = spawn_link( - fun() -> reader_loop(Self, Parent, Source, MissingRevs_or_DocIds) end - ), - MissingRevs = case MissingRevs_or_DocIds of - Pid when is_pid(Pid) -> - Pid; - _ListDocIds -> - nil - end, + fun() -> reader_loop(Self, Parent, Source, MissingRevs) end), State = #state{ parent = Parent, source = Source, @@ -175,8 +166,6 @@ handle_reader_loop_complete(#state{monitor_count=0} = State) -> handle_reader_loop_complete(State) -> {noreply, State#state{complete = waiting_on_monitors}}. -calculate_new_high_seq(#state{missing_revs=nil}) -> - nil; calculate_new_high_seq(#state{requested_seqs=[], opened_seqs=[Open|_]}) -> Open; calculate_new_high_seq(#state{requested_seqs=[Req|_], opened_seqs=[Open|_]}) @@ -188,9 +177,9 @@ calculate_new_high_seq(State) -> hd(State#state.opened_seqs). split_revlist(Rev, {[CurrentAcc|Rest], BaseLength, Length}) -> - case Length+size(Rev) > 8192 of + case Length+size(Rev)+3 > 8192 of false -> - {[[Rev|CurrentAcc] | Rest], BaseLength, Length+size(Rev)}; + {[[Rev|CurrentAcc] | Rest], BaseLength, Length+size(Rev)+3}; true -> {[[Rev],CurrentAcc|Rest], BaseLength, BaseLength} end. @@ -201,8 +190,6 @@ split_revlist(Rev, {[CurrentAcc|Rest], BaseLength, Length}) -> % opened seqs greater than the smallest outstanding request. I believe its the % minimal set of info needed to correctly calculate which seqs have been % replicated (because remote docs can be opened out-of-order) -- APK -update_sequence_lists(_Seq, #state{missing_revs=nil} = State) -> - State; update_sequence_lists(Seq, State) -> Requested = lists:delete(Seq, State#state.requested_seqs), AllOpened = lists:merge([Seq], State#state.opened_seqs), @@ -226,8 +213,8 @@ open_doc_revs(#http_db{url = Url} = DbS, DocId, Revs) -> %% all this logic just splits up revision lists that are too long for %% MochiWeb into multiple requests BaseQS = [{revs,true}, {latest,true}, {att_encoding_info,true}], - BaseReq = DbS#http_db{resource=url_encode(DocId), qs=BaseQS}, - BaseLength = length(couch_rep_httpc:full_url(BaseReq)) + 11, % &open_revs= + BaseReq = DbS#http_db{resource=encode_doc_id(DocId), qs=BaseQS}, + BaseLength = length(couch_rep_httpc:full_url(BaseReq) ++ "&open_revs=[]"), {RevLists, _, _} = lists:foldl(fun split_revlist/2, {[[]], BaseLength, BaseLength}, couch_doc:revs_to_strs(Revs)), @@ -253,45 +240,6 @@ open_doc_revs(#http_db{url = Url} = DbS, DocId, Revs) -> end, lists:reverse(lists:foldl(Transform, [], JsonResults)). -open_doc(#http_db{url = Url} = DbS, DocId) -> - % get latest rev of the doc - Req = DbS#http_db{ - resource=url_encode(DocId), - qs=[{att_encoding_info, true}] - }, - {Props} = Json = couch_rep_httpc:request(Req), - case couch_util:get_value(<<"_id">>, Props) of - Id when is_binary(Id) -> - #doc{id=Id, revs=Rev, atts=Atts} = Doc = couch_doc:from_json_obj(Json), - [Doc#doc{ - atts=[couch_rep_att:convert_stub(A, {DbS,Id,Rev}) || A <- Atts] - }]; - undefined -> - Err = couch_util:get_value(<<"error">>, Props, ?JSON_ENCODE(Json)), - ?LOG_ERROR("Replicator: error accessing doc ~s at ~s, reason: ~s", - [DocId, couch_util:url_strip_password(Url), Err]), - [] - end. - -reader_loop(ReaderServer, Parent, Source1, DocIds) when is_list(DocIds) -> - case Source1 of - #http_db{} -> - [gen_server:call(ReaderServer, {open_remote_doc, Id, nil, nil}, - infinity) || Id <- DocIds]; - _LocalDb -> - {ok, Source} = gen_server:call(Parent, get_source_db, infinity), - Docs = lists:foldr(fun(Id, Acc) -> - case couch_db:open_doc(Source, Id) of - {ok, Doc} -> - [Doc | Acc]; - _ -> - Acc - end - end, [], DocIds), - gen_server:call(ReaderServer, {add_docs, nil, Docs}, infinity) - end, - exit(complete); - reader_loop(ReaderServer, Parent, Source, MissingRevsServer) -> case couch_rep_missing_revs:next(MissingRevsServer) of complete -> @@ -326,8 +274,6 @@ maybe_reopen_db(#db{update_seq=OldSeq} = Db, HighSeq) when HighSeq > OldSeq -> maybe_reopen_db(Db, _HighSeq) -> Db. -spawn_document_request(Source, Id, nil, nil) -> - spawn_document_request(Source, Id); spawn_document_request(Source, Id, Seq, Revs) -> Server = self(), SpawnFun = fun() -> @@ -335,11 +281,3 @@ spawn_document_request(Source, Id, Seq, Revs) -> gen_server:call(Server, {add_docs, Seq, Results}, infinity) end, spawn_monitor(SpawnFun). - -spawn_document_request(Source, Id) -> - Server = self(), - SpawnFun = fun() -> - Results = open_doc(Source, Id), - gen_server:call(Server, {add_docs, nil, Results}, infinity) - end, - spawn_monitor(SpawnFun). diff --git a/apps/couch/src/couch_rep_writer.erl b/apps/couch/src/couch_rep_writer.erl index cf98ccfb..2b722e8e 100644 --- a/apps/couch/src/couch_rep_writer.erl +++ b/apps/couch/src/couch_rep_writer.erl @@ -21,8 +21,6 @@ start_link(Parent, _Target, Reader, _PostProps) -> writer_loop(Parent, Reader) -> case couch_rep_reader:next(Reader) of - {complete, nil} -> - ok; {complete, FinalSeq} -> Parent ! {writer_checkpoint, FinalSeq}, ok; @@ -41,12 +39,7 @@ writer_loop(Parent, Reader) -> ?LOG_DEBUG("writer failed to write an attachment ~p", [Err]), exit({attachment_request_failed, Err, Docs}) end, - case HighSeq of - nil -> - ok; - _SeqNumber -> - Parent ! {writer_checkpoint, HighSeq} - end, + Parent ! {writer_checkpoint, HighSeq}, couch_rep_att:cleanup(), couch_util:should_flush(), writer_loop(Parent, Reader) @@ -71,7 +64,7 @@ write_bulk_docs(_Db, []) -> []; write_bulk_docs(#http_db{headers = Headers} = Db, Docs) -> JsonDocs = [ - couch_doc:to_json_obj(Doc, [revs, att_gzip_length]) || Doc <- Docs + couch_doc:to_json_obj(Doc, [revs]) || Doc <- Docs ], Request = Db#http_db{ resource = "_bulk_docs", @@ -91,7 +84,7 @@ write_multi_part_doc(#http_db{headers=Headers} = Db, #doc{atts=Atts} = Doc) -> JsonBytes = ?JSON_ENCODE( couch_doc:to_json_obj( Doc, - [follows, att_encoding_info, attachments] + [follows, att_encoding_info, attachments, revs] ) ), Boundary = couch_uuids:random(), @@ -120,7 +113,7 @@ write_multi_part_doc(#http_db{headers=Headers} = Db, #doc{atts=Atts} = Doc) -> end end, Request = Db#http_db{ - resource = couch_util:url_encode(Doc#doc.id), + resource = couch_util:encode_doc_id(Doc), method = put, qs = [{new_edits, false}], body = {BodyFun, nil}, @@ -148,7 +141,8 @@ streamer_fun(Boundary, JsonBytes, Atts) -> {start, From} -> % better use a brand new queue, to ensure there's no garbage from % a previous (failed) iteration - {ok, DataQueue} = couch_work_queue:new(1024 * 1024, 1000), + {ok, DataQueue} = couch_work_queue:new( + [{max_size, 1024 * 1024}, {max_items, 1000}]), From ! {queue, DataQueue}, couch_doc:doc_to_multi_part_stream( Boundary, diff --git a/apps/couch/src/couch_replication_manager.erl b/apps/couch/src/couch_replication_manager.erl new file mode 100644 index 00000000..3f7cc27c --- /dev/null +++ b/apps/couch/src/couch_replication_manager.erl @@ -0,0 +1,629 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-module(couch_replication_manager). +-behaviour(gen_server). + +% public API +-export([replication_started/1, replication_completed/1, replication_error/2]). + +% gen_server callbacks +-export([start_link/0, init/1, handle_call/3, handle_info/2, handle_cast/2]). +-export([code_change/3, terminate/2]). + +-include("couch_db.hrl"). +-include("couch_js_functions.hrl"). + +-define(DOC_TO_REP, couch_rep_doc_id_to_rep_id). +-define(REP_TO_STATE, couch_rep_id_to_rep_state). +-define(INITIAL_WAIT, 2.5). % seconds +-define(MAX_WAIT, 600). % seconds + +-record(state, { + changes_feed_loop = nil, + db_notifier = nil, + rep_db_name = nil, + rep_start_pids = [], + max_retries +}). + +-record(rep_state, { + doc_id, + user_ctx, + doc, + starting, + retries_left, + max_retries, + wait = ?INITIAL_WAIT +}). + +-import(couch_util, [ + get_value/2, + get_value/3, + to_binary/1 +]). + + +start_link() -> + gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). + + +replication_started({BaseId, _} = RepId) -> + case rep_state(RepId) of + nil -> + ok; + #rep_state{doc_id = DocId} -> + update_rep_doc(DocId, [ + {<<"_replication_state">>, <<"triggered">>}, + {<<"_replication_id">>, ?l2b(BaseId)}]), + ok = gen_server:call(?MODULE, {rep_started, RepId}, infinity), + ?LOG_INFO("Document `~s` triggered replication `~s`", + [DocId, pp_rep_id(RepId)]) + end. + + +replication_completed(RepId) -> + case rep_state(RepId) of + nil -> + ok; + #rep_state{doc_id = DocId} -> + update_rep_doc(DocId, [{<<"_replication_state">>, <<"completed">>}]), + ok = gen_server:call(?MODULE, {rep_complete, RepId}, infinity), + ?LOG_INFO("Replication `~s` finished (triggered by document `~s`)", + [pp_rep_id(RepId), DocId]) + end. + + +replication_error({BaseId, _} = RepId, Error) -> + case rep_state(RepId) of + nil -> + ok; + #rep_state{doc_id = DocId} -> + % TODO: maybe add error reason to replication document + update_rep_doc(DocId, [ + {<<"_replication_state">>, <<"error">>}, + {<<"_replication_id">>, ?l2b(BaseId)}]), + ok = gen_server:call(?MODULE, {rep_error, RepId, Error}, infinity) + end. + + +init(_) -> + process_flag(trap_exit, true), + ?DOC_TO_REP = ets:new(?DOC_TO_REP, [named_table, set, protected]), + ?REP_TO_STATE = ets:new(?REP_TO_STATE, [named_table, set, protected]), + Server = self(), + ok = couch_config:register( + fun("replicator", "db", NewName) -> + ok = gen_server:cast(Server, {rep_db_changed, ?l2b(NewName)}); + ("replicator", "max_replication_retry_count", V) -> + ok = gen_server:cast(Server, {set_max_retries, retries_value(V)}) + end + ), + {Loop, RepDbName} = changes_feed_loop(), + {ok, #state{ + changes_feed_loop = Loop, + rep_db_name = RepDbName, + db_notifier = db_update_notifier(), + max_retries = retries_value( + couch_config:get("replicator", "max_replication_retry_count", "10")) + }}. + + +handle_call({rep_db_update, {ChangeProps} = Change}, _From, State) -> + NewState = try + process_update(State, Change) + catch + _Tag:Error -> + {RepProps} = get_value(doc, ChangeProps), + DocId = get_value(<<"_id">>, RepProps), + rep_db_update_error(Error, DocId), + State + end, + {reply, ok, NewState}; + +handle_call({rep_started, RepId}, _From, State) -> + case rep_state(RepId) of + nil -> + ok; + RepState -> + NewRepState = RepState#rep_state{ + starting = false, + retries_left = State#state.max_retries, + max_retries = State#state.max_retries, + wait = ?INITIAL_WAIT + }, + true = ets:insert(?REP_TO_STATE, {RepId, NewRepState}) + end, + {reply, ok, State}; + +handle_call({rep_complete, RepId}, _From, State) -> + true = ets:delete(?REP_TO_STATE, RepId), + {reply, ok, State}; + +handle_call({rep_error, RepId, Error}, _From, State) -> + {reply, ok, replication_error(State, RepId, Error)}; + +handle_call(Msg, From, State) -> + ?LOG_ERROR("Replication manager received unexpected call ~p from ~p", + [Msg, From]), + {stop, {error, {unexpected_call, Msg}}, State}. + + +handle_cast({rep_db_changed, NewName}, #state{rep_db_name = NewName} = State) -> + {noreply, State}; + +handle_cast({rep_db_changed, _NewName}, State) -> + {noreply, restart(State)}; + +handle_cast({rep_db_created, NewName}, #state{rep_db_name = NewName} = State) -> + {noreply, State}; + +handle_cast({rep_db_created, _NewName}, State) -> + {noreply, restart(State)}; + +handle_cast({set_max_retries, MaxRetries}, State) -> + {noreply, State#state{max_retries = MaxRetries}}; + +handle_cast(Msg, State) -> + ?LOG_ERROR("Replication manager received unexpected cast ~p", [Msg]), + {stop, {error, {unexpected_cast, Msg}}, State}. + + +handle_info({'EXIT', From, normal}, #state{changes_feed_loop = From} = State) -> + % replicator DB deleted + {noreply, State#state{changes_feed_loop = nil, rep_db_name = nil}}; + +handle_info({'EXIT', From, Reason}, #state{db_notifier = From} = State) -> + ?LOG_ERROR("Database update notifier died. Reason: ~p", [Reason]), + {stop, {db_update_notifier_died, Reason}, State}; + +handle_info({'EXIT', From, normal}, #state{rep_start_pids = Pids} = State) -> + % one of the replication start processes terminated successfully + {noreply, State#state{rep_start_pids = Pids -- [From]}}; + +handle_info({'DOWN', _Ref, _, _, _}, State) -> + % From a db monitor created by a replication process. Ignore. + {noreply, State}; + +handle_info(Msg, State) -> + ?LOG_ERROR("Replication manager received unexpected message ~p", [Msg]), + {stop, {unexpected_msg, Msg}, State}. + + +terminate(_Reason, State) -> + #state{ + rep_start_pids = StartPids, + changes_feed_loop = Loop, + db_notifier = Notifier + } = State, + stop_all_replications(), + lists:foreach( + fun(Pid) -> + catch unlink(Pid), + catch exit(Pid, stop) + end, + [Loop | StartPids]), + true = ets:delete(?REP_TO_STATE), + true = ets:delete(?DOC_TO_REP), + couch_db_update_notifier:stop(Notifier). + + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + + +changes_feed_loop() -> + {ok, RepDb} = ensure_rep_db_exists(), + Server = self(), + Pid = spawn_link( + fun() -> + ChangesFeedFun = couch_changes:handle_changes( + #changes_args{ + include_docs = true, + feed = "continuous", + timeout = infinity, + db_open_options = [sys_db] + }, + {json_req, null}, + RepDb + ), + ChangesFeedFun( + fun({change, Change, _}, _) -> + case has_valid_rep_id(Change) of + true -> + ok = gen_server:call( + Server, {rep_db_update, Change}, infinity); + false -> + ok + end; + (_, _) -> + ok + end + ) + end + ), + couch_db:close(RepDb), + {Pid, couch_db:name(RepDb)}. + + +has_valid_rep_id({Change}) -> + has_valid_rep_id(get_value(<<"id">>, Change)); +has_valid_rep_id(<<?DESIGN_DOC_PREFIX, _Rest/binary>>) -> + false; +has_valid_rep_id(_Else) -> + true. + + +db_update_notifier() -> + Server = self(), + {ok, Notifier} = couch_db_update_notifier:start_link( + fun({created, DbName}) -> + case ?l2b(couch_config:get("replicator", "db", "_replicator")) of + DbName -> + ok = gen_server:cast(Server, {rep_db_created, DbName}); + _ -> + ok + end; + (_) -> + % no need to handle the 'deleted' event - the changes feed loop + % dies when the database is deleted + ok + end + ), + Notifier. + + +restart(#state{changes_feed_loop = Loop, rep_start_pids = StartPids} = State) -> + stop_all_replications(), + lists:foreach( + fun(Pid) -> + catch unlink(Pid), + catch exit(Pid, rep_db_changed) + end, + [Loop | StartPids]), + {NewLoop, NewRepDbName} = changes_feed_loop(), + State#state{ + changes_feed_loop = NewLoop, + rep_db_name = NewRepDbName, + rep_start_pids = [] + }. + + +process_update(State, {Change}) -> + {RepProps} = JsonRepDoc = get_value(doc, Change), + DocId = get_value(<<"_id">>, RepProps), + case get_value(<<"deleted">>, Change, false) of + true -> + rep_doc_deleted(DocId), + State; + false -> + case get_value(<<"_replication_state">>, RepProps) of + undefined -> + maybe_start_replication(State, DocId, JsonRepDoc); + <<"triggered">> -> + maybe_start_replication(State, DocId, JsonRepDoc); + <<"completed">> -> + replication_complete(DocId), + State; + <<"error">> -> + case ets:lookup(?DOC_TO_REP, DocId) of + [] -> + maybe_start_replication(State, DocId, JsonRepDoc); + _ -> + State + end + end + end. + + +rep_db_update_error(Error, DocId) -> + case Error of + {bad_rep_doc, Reason} -> + ok; + _ -> + Reason = to_binary(Error) + end, + ?LOG_ERROR("Replication manager, error processing document `~s`: ~s", + [DocId, Reason]), + update_rep_doc(DocId, [{<<"_replication_state">>, <<"error">>}]). + + +rep_user_ctx({RepDoc}) -> + case get_value(<<"user_ctx">>, RepDoc) of + undefined -> + #user_ctx{}; + {UserCtx} -> + #user_ctx{ + name = get_value(<<"name">>, UserCtx, null), + roles = get_value(<<"roles">>, UserCtx, []) + } + end. + + +maybe_start_replication(State, DocId, RepDoc) -> + UserCtx = rep_user_ctx(RepDoc), + {BaseId, _} = RepId = make_rep_id(RepDoc, UserCtx), + case rep_state(RepId) of + nil -> + RepState = #rep_state{ + doc_id = DocId, + user_ctx = UserCtx, + doc = RepDoc, + starting = true, + retries_left = State#state.max_retries, + max_retries = State#state.max_retries + }, + true = ets:insert(?REP_TO_STATE, {RepId, RepState}), + true = ets:insert(?DOC_TO_REP, {DocId, RepId}), + ?LOG_INFO("Attempting to start replication `~s` (document `~s`).", + [pp_rep_id(RepId), DocId]), + Server = self(), + Pid = spawn_link(fun() -> + start_replication(Server, RepDoc, RepId, UserCtx, 0) + end), + State#state{rep_start_pids = [Pid | State#state.rep_start_pids]}; + #rep_state{doc_id = DocId} -> + State; + #rep_state{starting = false, doc_id = OtherDocId} -> + ?LOG_INFO("The replication specified by the document `~s` was already" + " triggered by the document `~s`", [DocId, OtherDocId]), + maybe_tag_rep_doc(DocId, RepDoc, ?l2b(BaseId)), + State; + #rep_state{starting = true, doc_id = OtherDocId} -> + ?LOG_INFO("The replication specified by the document `~s` is already" + " being triggered by the document `~s`", [DocId, OtherDocId]), + maybe_tag_rep_doc(DocId, RepDoc, ?l2b(BaseId)), + State + end. + + +make_rep_id(RepDoc, UserCtx) -> + try + couch_rep:make_replication_id(RepDoc, UserCtx) + catch + throw:{error, Reason} -> + throw({bad_rep_doc, Reason}); + Tag:Err -> + throw({bad_rep_doc, to_binary({Tag, Err})}) + end. + + +maybe_tag_rep_doc(DocId, {RepProps}, RepId) -> + case get_value(<<"_replication_id">>, RepProps) of + RepId -> + ok; + _ -> + update_rep_doc(DocId, [{<<"_replication_id">>, RepId}]) + end. + + +start_replication(Server, RepDoc, RepId, UserCtx, Wait) -> + ok = timer:sleep(Wait * 1000), + case (catch couch_rep:start_replication(RepDoc, RepId, UserCtx, ?MODULE)) of + Pid when is_pid(Pid) -> + ok = gen_server:call(Server, {rep_started, RepId}, infinity), + couch_rep:get_result(Pid, RepId, RepDoc, UserCtx); + Error -> + replication_error(RepId, Error) + end. + + +replication_complete(DocId) -> + case ets:lookup(?DOC_TO_REP, DocId) of + [{DocId, RepId}] -> + case rep_state(RepId) of + nil -> + couch_rep:end_replication(RepId); + #rep_state{} -> + ok + end, + true = ets:delete(?DOC_TO_REP, DocId); + _ -> + ok + end. + + +rep_doc_deleted(DocId) -> + case ets:lookup(?DOC_TO_REP, DocId) of + [{DocId, RepId}] -> + couch_rep:end_replication(RepId), + true = ets:delete(?REP_TO_STATE, RepId), + true = ets:delete(?DOC_TO_REP, DocId), + ?LOG_INFO("Stopped replication `~s` because replication document `~s`" + " was deleted", [pp_rep_id(RepId), DocId]); + [] -> + ok + end. + + +replication_error(State, RepId, Error) -> + case rep_state(RepId) of + nil -> + State; + RepState -> + maybe_retry_replication(RepId, RepState, Error, State) + end. + +maybe_retry_replication(RepId, #rep_state{retries_left = 0} = RepState, Error, State) -> + #rep_state{ + doc_id = DocId, + max_retries = MaxRetries + } = RepState, + couch_rep:end_replication(RepId), + true = ets:delete(?REP_TO_STATE, RepId), + true = ets:delete(?DOC_TO_REP, DocId), + ?LOG_ERROR("Error in replication `~s` (triggered by document `~s`): ~s" + "~nReached maximum retry attempts (~p).", + [pp_rep_id(RepId), DocId, to_binary(error_reason(Error)), MaxRetries]), + State; + +maybe_retry_replication(RepId, RepState, Error, State) -> + #rep_state{ + doc_id = DocId, + user_ctx = UserCtx, + doc = RepDoc + } = RepState, + #rep_state{wait = Wait} = NewRepState = state_after_error(RepState), + true = ets:insert(?REP_TO_STATE, {RepId, NewRepState}), + ?LOG_ERROR("Error in replication `~s` (triggered by document `~s`): ~s" + "~nRestarting replication in ~p seconds.", + [pp_rep_id(RepId), DocId, to_binary(error_reason(Error)), Wait]), + Server = self(), + Pid = spawn_link(fun() -> + start_replication(Server, RepDoc, RepId, UserCtx, Wait) + end), + State#state{rep_start_pids = [Pid | State#state.rep_start_pids]}. + + +stop_all_replications() -> + ?LOG_INFO("Stopping all ongoing replications because the replicator" + " database was deleted or changed", []), + ets:foldl( + fun({_, RepId}, _) -> + couch_rep:end_replication(RepId) + end, + ok, ?DOC_TO_REP), + true = ets:delete_all_objects(?REP_TO_STATE), + true = ets:delete_all_objects(?DOC_TO_REP). + + +update_rep_doc(RepDocId, KVs) -> + {ok, RepDb} = ensure_rep_db_exists(), + try + case couch_db:open_doc(RepDb, RepDocId, []) of + {ok, LatestRepDoc} -> + update_rep_doc(RepDb, LatestRepDoc, KVs); + _ -> + ok + end + catch throw:conflict -> + % Shouldn't happen, as by default only the role _replicator can + % update replication documents. + ?LOG_ERROR("Conflict error when updating replication document `~s`." + " Retrying.", [RepDocId]), + ok = timer:sleep(5), + update_rep_doc(RepDocId, KVs) + after + couch_db:close(RepDb) + end. + +update_rep_doc(RepDb, #doc{body = {RepDocBody}} = RepDoc, KVs) -> + NewRepDocBody = lists:foldl( + fun({<<"_replication_state">> = K, State} = KV, Body) -> + case get_value(K, Body) of + State -> + Body; + _ -> + Body1 = lists:keystore(K, 1, Body, KV), + lists:keystore( + <<"_replication_state_time">>, 1, Body1, + {<<"_replication_state_time">>, timestamp()}) + end; + ({K, _V} = KV, Body) -> + lists:keystore(K, 1, Body, KV) + end, + RepDocBody, KVs), + case NewRepDocBody of + RepDocBody -> + ok; + _ -> + % Might not succeed - when the replication doc is deleted right + % before this update (not an error, ignore). + couch_db:update_doc(RepDb, RepDoc#doc{body = {NewRepDocBody}}, []) + end. + + +% RFC3339 timestamps. +% Note: doesn't include the time seconds fraction (RFC3339 says it's optional). +timestamp() -> + {{Year, Month, Day}, {Hour, Min, Sec}} = calendar:now_to_local_time(now()), + UTime = erlang:universaltime(), + LocalTime = calendar:universal_time_to_local_time(UTime), + DiffSecs = calendar:datetime_to_gregorian_seconds(LocalTime) - + calendar:datetime_to_gregorian_seconds(UTime), + zone(DiffSecs div 3600, (DiffSecs rem 3600) div 60), + iolist_to_binary( + io_lib:format("~4..0w-~2..0w-~2..0wT~2..0w:~2..0w:~2..0w~s", + [Year, Month, Day, Hour, Min, Sec, + zone(DiffSecs div 3600, (DiffSecs rem 3600) div 60)])). + +zone(Hr, Min) when Hr >= 0, Min >= 0 -> + io_lib:format("+~2..0w:~2..0w", [Hr, Min]); +zone(Hr, Min) -> + io_lib:format("-~2..0w:~2..0w", [abs(Hr), abs(Min)]). + + +ensure_rep_db_exists() -> + DbName = ?l2b(couch_config:get("replicator", "db", "_replicator")), + Opts = [ + {user_ctx, #user_ctx{roles=[<<"_admin">>, <<"_replicator">>]}}, + sys_db + ], + case couch_db:open(DbName, Opts) of + {ok, Db} -> + Db; + _Error -> + {ok, Db} = couch_db:create(DbName, Opts) + end, + ok = ensure_rep_ddoc_exists(Db, <<"_design/_replicator">>), + {ok, Db}. + + +ensure_rep_ddoc_exists(RepDb, DDocID) -> + case couch_db:open_doc(RepDb, DDocID, []) of + {ok, _Doc} -> + ok; + _ -> + DDoc = couch_doc:from_json_obj({[ + {<<"_id">>, DDocID}, + {<<"language">>, <<"javascript">>}, + {<<"validate_doc_update">>, ?REP_DB_DOC_VALIDATE_FUN} + ]}), + {ok, _Rev} = couch_db:update_doc(RepDb, DDoc, []) + end, + ok. + + +% pretty-print replication id +pp_rep_id({Base, Extension}) -> + Base ++ Extension. + + +rep_state(RepId) -> + case ets:lookup(?REP_TO_STATE, RepId) of + [{RepId, RepState}] -> + RepState; + [] -> + nil + end. + + +error_reason({error, Reason}) -> + Reason; +error_reason(Reason) -> + Reason. + + +retries_value("infinity") -> + infinity; +retries_value(Value) -> + list_to_integer(Value). + + +state_after_error(#rep_state{retries_left = Left, wait = Wait} = State) -> + Wait2 = erlang:min(trunc(Wait * 2), ?MAX_WAIT), + case Left of + infinity -> + State#rep_state{wait = Wait2}; + _ -> + State#rep_state{retries_left = Left - 1, wait = Wait2} + end. diff --git a/apps/couch/src/couch_server.erl b/apps/couch/src/couch_server.erl index 4252a035..cfe0b5fc 100644 --- a/apps/couch/src/couch_server.erl +++ b/apps/couch/src/couch_server.erl @@ -80,6 +80,7 @@ check_dbname(#server{dbname_regexp=RegExp}, DbName) -> nomatch -> case DbName of "_users" -> ok; + "_replicator" -> ok; _Else -> {error, illegal_database_name} end; @@ -173,7 +174,7 @@ all_databases(Prefix) -> end, [list_to_binary(filename:rootname(RelativeFilename, ".couch")) | AccIn] end, []), - {ok, Filenames}. + {ok, lists:usort(Filenames)}. maybe_close_lru_db(#server{dbs_open=NumOpen, max_dbs_open=MaxOpen}=Server) diff --git a/apps/couch/src/couch_server_sup.erl b/apps/couch/src/couch_server_sup.erl index 6f6ca61a..726e397f 100644 --- a/apps/couch/src/couch_server_sup.erl +++ b/apps/couch/src/couch_server_sup.erl @@ -104,16 +104,23 @@ start_server(IniFiles) -> unlink(ConfigPid), Ip = couch_config:get("httpd", "bind_address"), - Port = mochiweb_socket_server:get(couch_httpd, port), io:format("Apache CouchDB has started. Time to relax.~n"), - - ?LOG_INFO("Apache CouchDB has started on http://~s:~w/", [Ip, Port]), - + Uris = [get_uri(Name, Ip) || Name <- [couch_httpd, https]], + [begin + case Uri of + undefined -> ok; + Uri -> ?LOG_INFO("Apache CouchDB has started on ~s", [Uri]) + end + end + || Uri <- Uris], case couch_config:get("couchdb", "uri_file", null) of null -> ok; UriFile -> - Line = io_lib:format("http://~s:~w/~n", [Ip, Port]), - file:write_file(UriFile, Line) + Lines = [begin case Uri of + undefined -> []; + Uri -> io_lib:format("~s~n", [Uri]) + end end || Uri <- Uris], + ok = file:write_file(UriFile, Lines) end, {ok, Pid}. @@ -127,3 +134,22 @@ config_change("couchdb", "util_driver_dir") -> init(ChildSpecs) -> {ok, ChildSpecs}. + +get_uri(Name, Ip) -> + case get_port(Name) of + undefined -> + undefined; + Port -> + io_lib:format("~s://~s:~w/", [get_scheme(Name), Ip, Port]) + end. + +get_scheme(couch_httpd) -> "http"; +get_scheme(https) -> "https". + +get_port(Name) -> + try + mochiweb_socket_server:get(Name, port) + catch + exit:{noproc, _}-> + undefined + end. diff --git a/apps/couch/src/couch_stream.erl b/apps/couch/src/couch_stream.erl index 04c17770..60af1c2b 100644 --- a/apps/couch/src/couch_stream.erl +++ b/apps/couch/src/couch_stream.erl @@ -24,7 +24,7 @@ -define(DEFAULT_STREAM_CHUNK, 16#00100000). % 1 meg chunks when streaming data --export([open/1, open/3, close/1, write/2, foldl/4, foldl/5, foldl_decode/6, +-export([open/1, open/3, close/1, write/2, foldl/4, foldl/5, range_foldl/6, foldl_decode/6, old_foldl/5,old_copy_to_new_stream/4]). -export([copy_to_new_stream/3,old_read_term/2]). -export([init/1, terminate/2, handle_call/3]). @@ -112,22 +112,60 @@ foldl_decode(Fd, PosList, Md5, Enc, Fun, Acc) -> foldl(_Fd, [], Md5, Md5Acc, _Fun, Acc) -> Md5 = couch_util:md5_final(Md5Acc), Acc; +foldl(Fd, [{Pos, _Size}], Md5, Md5Acc, Fun, Acc) -> % 0110 UPGRADE CODE + foldl(Fd, [Pos], Md5, Md5Acc, Fun, Acc); foldl(Fd, [Pos], Md5, Md5Acc, Fun, Acc) -> {ok, Bin} = couch_file:pread_iolist(Fd, Pos), Md5 = couch_util:md5_final(couch_util:md5_update(Md5Acc, Bin)), Fun(Bin, Acc); +foldl(Fd, [{Pos, _Size}|Rest], Md5, Md5Acc, Fun, Acc) -> + foldl(Fd, [Pos|Rest], Md5, Md5Acc, Fun, Acc); foldl(Fd, [Pos|Rest], Md5, Md5Acc, Fun, Acc) -> {ok, Bin} = couch_file:pread_iolist(Fd, Pos), foldl(Fd, Rest, Md5, couch_util:md5_update(Md5Acc, Bin), Fun, Fun(Bin, Acc)). +range_foldl(Fd, PosList, From, To, Fun, Acc) -> + range_foldl(Fd, PosList, From, To, 0, Fun, Acc). + +range_foldl(_Fd, _PosList, _From, To, Off, _Fun, Acc) when Off >= To -> + Acc; +range_foldl(Fd, [Pos|Rest], From, To, Off, Fun, Acc) when is_integer(Pos) -> % old-style attachment + {ok, Bin} = couch_file:pread_iolist(Fd, Pos), + range_foldl(Fd, [{Pos, iolist_size(Bin)}] ++ Rest, From, To, Off, Fun, Acc); +range_foldl(Fd, [{_Pos, Size}|Rest], From, To, Off, Fun, Acc) when From > Off + Size -> + range_foldl(Fd, Rest, From, To, Off + Size, Fun, Acc); +range_foldl(Fd, [{Pos, Size}|Rest], From, To, Off, Fun, Acc) -> + {ok, Bin} = couch_file:pread_iolist(Fd, Pos), + Bin1 = if + From =< Off andalso To >= Off + Size -> Bin; %% the whole block is covered + true -> + PrefixLen = clip(From - Off, 0, Size), + PostfixLen = clip(Off + Size - To, 0, Size), + MatchLen = Size - PrefixLen - PostfixLen, + <<_Prefix:PrefixLen/binary,Match:MatchLen/binary,_Postfix:PostfixLen/binary>> = iolist_to_binary(Bin), + Match + end, + range_foldl(Fd, Rest, From, To, Off + Size, Fun, Fun(Bin1, Acc)). + +clip(Value, Lo, Hi) -> + if + Value < Lo -> Lo; + Value > Hi -> Hi; + true -> Value + end. + foldl_decode(_DecFun, _Fd, [], Md5, Md5Acc, _Fun, Acc) -> Md5 = couch_util:md5_final(Md5Acc), Acc; +foldl_decode(DecFun, Fd, [{Pos, _Size}], Md5, Md5Acc, Fun, Acc) -> + foldl_decode(DecFun, Fd, [Pos], Md5, Md5Acc, Fun, Acc); foldl_decode(DecFun, Fd, [Pos], Md5, Md5Acc, Fun, Acc) -> {ok, EncBin} = couch_file:pread_iolist(Fd, Pos), Md5 = couch_util:md5_final(couch_util:md5_update(Md5Acc, EncBin)), Bin = DecFun(EncBin), Fun(Bin, Acc); +foldl_decode(DecFun, Fd, [{Pos, _Size}|Rest], Md5, Md5Acc, Fun, Acc) -> + foldl_decode(DecFun, Fd, [Pos|Rest], Md5, Md5Acc, Fun, Acc); foldl_decode(DecFun, Fd, [Pos|Rest], Md5, Md5Acc, Fun, Acc) -> {ok, EncBin} = couch_file:pread_iolist(Fd, Pos), Bin = DecFun(EncBin), @@ -227,7 +265,7 @@ handle_call({write, Bin}, _From, Stream) -> {ok, Pos} = couch_file:append_binary(Fd, WriteBin2), WrittenLen2 = WrittenLen + iolist_size(WriteBin2), Md5_2 = couch_util:md5_update(Md5, WriteBin2), - Written2 = [Pos|Written] + Written2 = [{Pos, iolist_size(WriteBin2)}|Written] end, {reply, ok, Stream#stream{ @@ -265,7 +303,7 @@ handle_call(close, _From, Stream) -> {lists:reverse(Written), WrittenLen, IdenLen, Md5Final, IdenMd5Final}; _ -> {ok, Pos} = couch_file:append_binary(Fd, WriteBin2), - StreamInfo = lists:reverse(Written, [Pos]), + StreamInfo = lists:reverse(Written, [{Pos, iolist_size(WriteBin2)}]), StreamLen = WrittenLen + iolist_size(WriteBin2), {StreamInfo, StreamLen, IdenLen, Md5Final, IdenMd5Final} end, diff --git a/apps/couch/src/couch_util.erl b/apps/couch/src/couch_util.erl index a7de6994..839f5956 100644 --- a/apps/couch/src/couch_util.erl +++ b/apps/couch/src/couch_util.erl @@ -15,10 +15,10 @@ -export([priv_dir/0, start_driver/1, normpath/1]). -export([should_flush/0, should_flush/1, to_existing_atom/1]). -export([rand32/0, implode/2, collate/2, collate/3]). --export([abs_pathname/1,abs_pathname/2, trim/1, ascii_lower/1]). +-export([abs_pathname/1,abs_pathname/2, trim/1]). -export([encodeBase64Url/1, decodeBase64Url/1]). --export([to_hex/1, parse_term/1, dict_find/3]). --export([file_read_size/1, get_nested_json_value/2, json_user_ctx/1]). +-export([validate_utf8/1, to_hex/1, parse_term/1, dict_find/3]). +-export([get_nested_json_value/2, json_user_ctx/1]). -export([proplist_apply_field/2, json_apply_field/2]). -export([to_binary/1, to_integer/1, to_list/1, url_encode/1]). -export([json_encode/1, json_decode/1]). @@ -28,9 +28,9 @@ -export([md5/1, md5_init/0, md5_update/2, md5_final/1]). -export([reorder_results/2]). -export([url_strip_password/1]). +-export([encode_doc_id/1]). -include("couch_db.hrl"). --include_lib("kernel/include/file.hrl"). % arbitrarily chosen amount of memory to use before flushing to disk -define(FLUSH_MAX_MEM, 10000000). @@ -107,6 +107,37 @@ simple_call(Pid, Message) -> erlang:demonitor(MRef, [flush]) end. +validate_utf8(Data) when is_list(Data) -> + validate_utf8(?l2b(Data)); +validate_utf8(Bin) when is_binary(Bin) -> + validate_utf8_fast(Bin, 0). + +validate_utf8_fast(B, O) -> + case B of + <<_:O/binary>> -> + true; + <<_:O/binary, C1, _/binary>> when + C1 < 128 -> + validate_utf8_fast(B, 1 + O); + <<_:O/binary, C1, C2, _/binary>> when + C1 >= 194, C1 =< 223, + C2 >= 128, C2 =< 191 -> + validate_utf8_fast(B, 2 + O); + <<_:O/binary, C1, C2, C3, _/binary>> when + C1 >= 224, C1 =< 239, + C2 >= 128, C2 =< 191, + C3 >= 128, C3 =< 191 -> + validate_utf8_fast(B, 3 + O); + <<_:O/binary, C1, C2, C3, C4, _/binary>> when + C1 >= 240, C1 =< 244, + C2 >= 128, C2 =< 191, + C3 >= 128, C3 =< 191, + C4 >= 128, C4 =< 191 -> + validate_utf8_fast(B, 4 + O); + _ -> + false + end. + to_hex([]) -> []; to_hex(Bin) when is_binary(Bin) -> @@ -204,18 +235,6 @@ separate_cmd_args(" " ++ Rest, CmdAcc) -> separate_cmd_args([Char|Rest], CmdAcc) -> separate_cmd_args(Rest, [Char | CmdAcc]). -% lowercases string bytes that are the ascii characters A-Z. -% All other characters/bytes are ignored. -ascii_lower(String) -> - ascii_lower(String, []). - -ascii_lower([], Acc) -> - lists:reverse(Acc); -ascii_lower([Char | RestString], Acc) when Char >= $A, Char =< $B -> - ascii_lower(RestString, [Char + ($a-$A) | Acc]); -ascii_lower([Char | RestString], Acc) -> - ascii_lower(RestString, [Char | Acc]). - % Is a character whitespace? is_whitespace($\s) -> true; is_whitespace($\t) -> true; @@ -315,14 +334,6 @@ dict_find(Key, Dict, DefaultValue) -> DefaultValue end. - -file_read_size(FileName) -> - case file:read_file_info(FileName) of - {ok, FileInfo} -> - FileInfo#file_info.size; - Error -> Error - end. - to_binary(V) when is_binary(V) -> V; to_binary(V) when is_list(V) -> @@ -413,19 +424,14 @@ compressible_att_type(MimeType) when is_binary(MimeType) -> compressible_att_type(MimeType) -> TypeExpList = re:split( couch_config:get("attachments", "compressible_types", ""), - ", ?", + "\\s*,\\s*", [{return, list}] ), lists:any( fun(TypeExp) -> Regexp = ["^\\s*", re:replace(TypeExp, "\\*", ".*"), "(?:\\s*;.*?)?\\s*", $$], - case re:run(MimeType, Regexp, [caseless]) of - {match, _} -> - true; - _ -> - false - end + re:run(MimeType, Regexp, [caseless]) =/= nomatch end, [T || T <- TypeExpList, T /= []] ). @@ -459,3 +465,14 @@ url_strip_password(Url) -> "http(s)?://([^:]+):[^@]+@(.*)$", "http\\1://\\2:*****@\\3", [{return, list}]). + +encode_doc_id(#doc{id = Id}) -> + encode_doc_id(Id); +encode_doc_id(Id) when is_list(Id) -> + encode_doc_id(?l2b(Id)); +encode_doc_id(<<"_design/", Rest/binary>>) -> + "_design/" ++ url_encode(Rest); +encode_doc_id(<<"_local/", Rest/binary>>) -> + "_local/" ++ url_encode(Rest); +encode_doc_id(Id) -> + url_encode(Id). diff --git a/apps/couch/src/couch_view.erl b/apps/couch/src/couch_view.erl index 6093d69d..05174245 100644 --- a/apps/couch/src/couch_view.erl +++ b/apps/couch/src/couch_view.erl @@ -55,11 +55,22 @@ get_group_server(DbName, GroupId) -> get_group(Db, GroupId, Stale) -> MinUpdateSeq = case Stale of ok -> 0; + update_after -> 0; _Else -> couch_db:get_update_seq(Db) end, - couch_view_group:request_group( - get_group_server(couch_db:name(Db), GroupId), - MinUpdateSeq). + GroupPid = get_group_server(couch_db:name(Db), GroupId), + Result = couch_view_group:request_group(GroupPid, MinUpdateSeq), + case Stale of + update_after -> + % best effort, process might die + spawn(fun() -> + LastSeq = couch_db:get_update_seq(Db), + couch_view_group:request_group(GroupPid, LastSeq) + end); + _ -> + ok + end, + Result. get_temp_group(Db, Language, DesignOptions, MapSrc, RedSrc) -> couch_view_group:request_group( @@ -82,13 +93,18 @@ cleanup_index_files(Db) -> FileList = list_index_files(Db), - % regex that matches all ddocs - RegExp = "("++ string:join(Sigs, "|") ++")", + DeleteFiles = + if length(Sigs) =:= 0 -> + FileList; + true -> + % regex that matches all ddocs + RegExp = "("++ string:join(Sigs, "|") ++")", % filter out the ones in use - DeleteFiles = [FilePath - || FilePath <- FileList, - re:run(FilePath, RegExp, [{capture, none}]) =:= nomatch], + [FilePath || FilePath <- FileList, + re:run(FilePath, RegExp, [{capture, none}]) =:= nomatch] + end, + % delete unused files ?LOG_DEBUG("deleting unused view index files: ~p",[DeleteFiles]), RootDir = couch_config:get("couchdb", "view_index_dir"), @@ -277,36 +293,42 @@ terminate(_Reason, _Srv) -> ok. -handle_call({get_group_server, DbName, - #group{name=GroupId,sig=Sig}=Group}, _From, #server{root_dir=Root}=Server) -> +handle_call({get_group_server, DbName, #group{sig=Sig}=Group}, From, + #server{root_dir=Root}=Server) -> case ets:lookup(group_servers_by_sig, {DbName, Sig}) of [] -> - ?LOG_DEBUG("Spawning new group server for view group ~s in database ~s.", - [GroupId, DbName]), - case (catch couch_view_group:start_link({Root, DbName, Group})) of - {ok, NewPid} -> - add_to_ets(NewPid, DbName, Sig), - {reply, {ok, NewPid}, Server}; - {error, invalid_view_seq} -> - do_reset_indexes(DbName, Root), - case (catch couch_view_group:start_link({Root, DbName, Group})) of - {ok, NewPid} -> - add_to_ets(NewPid, DbName, Sig), - {reply, {ok, NewPid}, Server}; - Error -> - {reply, Error, Server} - end; - Error -> - {reply, Error, Server} - end; + spawn_monitor(fun() -> new_group(Root, DbName, Group) end), + ets:insert(group_servers_by_sig, {{DbName, Sig}, [From]}), + {noreply, Server}; + [{_, WaitList}] when is_list(WaitList) -> + ets:insert(group_servers_by_sig, {{DbName, Sig}, [From | WaitList]}), + {noreply, Server}; [{_, ExistingPid}] -> {reply, {ok, ExistingPid}, Server} - end. + end; + +handle_call({reset_indexes, DbName}, _From, #server{root_dir=Root}=Server) -> + do_reset_indexes(DbName, Root), + {reply, ok, Server}. handle_cast({reset_indexes, DbName}, #server{root_dir=Root}=Server) -> do_reset_indexes(DbName, Root), {noreply, Server}. +new_group(Root, DbName, #group{name=GroupId, sig=Sig} = Group) -> + ?LOG_DEBUG("Spawning new group server for view group ~s in database ~s.", + [GroupId, DbName]), + case (catch couch_view_group:start_link({Root, DbName, Group})) of + {ok, NewPid} -> + unlink(NewPid), + exit({DbName, Sig, {ok, NewPid}}); + {error, invalid_view_seq} -> + ok = gen_server:call(couch_view, {reset_indexes, DbName}), + new_group(Root, DbName, Group); + Error -> + exit({DbName, Sig, Error}) + end. + do_reset_indexes(DbName, Root) -> % shutdown all the updaters and clear the files, the db got changed Names = ets:lookup(couch_groups_by_db, DbName), @@ -333,6 +355,15 @@ handle_info({'EXIT', FromPid, Reason}, Server) -> [{_, {DbName, GroupId}}] -> delete_from_ets(FromPid, DbName, GroupId) end, + {noreply, Server}; + +handle_info({'DOWN', _, _, _, {DbName, Sig, Reply}}, Server) -> + [{_, WaitList}] = ets:lookup(group_servers_by_sig, {DbName, Sig}), + [gen_server:reply(From, Reply) || From <- WaitList], + case Reply of {ok, NewPid} -> + link(NewPid), + add_to_ets(NewPid, DbName, Sig); + _ -> ok end, {noreply, Server}. config_change("couchdb", "view_index_dir") -> diff --git a/apps/couch/src/couch_view_compactor.erl b/apps/couch/src/couch_view_compactor.erl index 38f63f66..69aaff00 100644 --- a/apps/couch/src/couch_view_compactor.erl +++ b/apps/couch/src/couch_view_compactor.erl @@ -55,8 +55,10 @@ compact_group(Group, EmptyGroup) -> Fun = fun({DocId, _ViewIdKeys} = KV, {Bt, Acc, TotalCopied, LastId}) -> if DocId =:= LastId -> % COUCHDB-999 - Msg = "Duplicates of ~s detected in ~s ~s - rebuild required", - exit(io_lib:format(Msg, [DocId, DbName, GroupId])); + ?LOG_ERROR("Duplicates of document `~s` detected in view group `~s`" + ", database `~s` - view rebuild, from scratch, is required", + [DocId, GroupId, DbName]), + exit({view_duplicated_id, DocId}); true -> ok end, if TotalCopied rem 10000 =:= 0 -> couch_task_status:update("Copied ~p of ~p Ids (~p%)", diff --git a/apps/couch/src/couch_view_group.erl b/apps/couch/src/couch_view_group.erl index 9aa6cd8d..11cb4c60 100644 --- a/apps/couch/src/couch_view_group.erl +++ b/apps/couch/src/couch_view_group.erl @@ -90,7 +90,7 @@ init({{_, DbName, _} = InitArgs, ReturnPid, Ref}) -> _ -> try couch_db:monitor(Db) after couch_db:close(Db) end, {ok, #group_state{ - db_name= DbName, + db_name=DbName, init_args=InitArgs, group=Group#group{dbname=DbName}, ref_counter=erlang:monitor(process,Fd)}} @@ -395,11 +395,15 @@ prepare_group({RootDir, DbName, #group{sig=Sig}=Group}, ForceReset)-> get_index_header_data(#group{current_seq=Seq, purge_seq=PurgeSeq, id_btree=IdBtree,views=Views}) -> - ViewStates = [couch_btree:get_state(Btree) || #view{btree=Btree} <- Views], - #index_header{seq=Seq, - purge_seq=PurgeSeq, - id_btree_state=couch_btree:get_state(IdBtree), - view_states=ViewStates}. + ViewStates = [ + {couch_btree:get_state(V#view.btree), V#view.update_seq, V#view.purge_seq} || V <- Views + ], + #index_header{ + seq=Seq, + purge_seq=PurgeSeq, + id_btree_state=couch_btree:get_state(IdBtree), + view_states=ViewStates + }. hex_sig(GroupSig) -> couch_util:to_hex(?b2l(GroupSig)). @@ -440,7 +444,7 @@ open_temp_group(DbName, Language, DesignOptions, MapSrc, RedSrc) -> reduce_funs= if RedSrc==[] -> []; true -> [{<<"_temp">>, RedSrc}] end, options=DesignOptions}, couch_db:close(Db), - {ok, set_view_sig(#group{name = <<"_temp">>, views=[View], + {ok, set_view_sig(#group{name = <<"_temp">>,lib={[]}, views=[View], def_lang=Language, design_options=DesignOptions})}; Error -> Error @@ -448,9 +452,40 @@ open_temp_group(DbName, Language, DesignOptions, MapSrc, RedSrc) -> set_view_sig(#group{ views=Views, + lib={[]}, + def_lang=Language, + design_options=DesignOptions}=G) -> + ViewInfo = [old_view_format(V) || V <- Views], + G#group{sig=couch_util:md5(term_to_binary({ViewInfo, Language, DesignOptions}))}; +set_view_sig(#group{ + views=Views, + lib=Lib, def_lang=Language, design_options=DesignOptions}=G) -> - G#group{sig=couch_util:md5(term_to_binary({Views, Language, DesignOptions}))}. + ViewInfo = [old_view_format(V) || V <- Views], + G#group{sig=couch_util:md5(term_to_binary({ViewInfo, Language, DesignOptions, sort_lib(Lib)}))}. + +% Use the old view record format so group sig's don't change +old_view_format(View) -> + { + view, + View#view.id_num, + View#view.map_names, + View#view.def, + View#view.btree, + View#view.reduce_funs, + View#view.options + }. + +sort_lib({Lib}) -> + sort_lib(Lib, []). +sort_lib([], LAcc) -> + lists:keysort(1, LAcc); +sort_lib([{LName, {LObj}}|Rest], LAcc) -> + LSorted = sort_lib(LObj, []), % descend into nested object + sort_lib(Rest, [{LName, LSorted}|LAcc]); +sort_lib([{LName, LCode}|Rest], LAcc) -> + sort_lib(Rest, [{LName, LCode}|LAcc]). open_db_group(DbName, GroupId) -> {Pid, Ref} = spawn_monitor(fun() -> @@ -509,52 +544,44 @@ design_doc_to_view_group(#doc{id=Id,body={Fields}}) -> Language = couch_util:get_value(<<"language">>, Fields, <<"javascript">>), {DesignOptions} = couch_util:get_value(<<"options">>, Fields, {[]}), {RawViews} = couch_util:get_value(<<"views">>, Fields, {[]}), + Lib = couch_util:get_value(<<"lib">>, RawViews, {[]}), % add the views to a dictionary object, with the map source as the key DictBySrc = lists:foldl( fun({Name, {MRFuns}}, DictBySrcAcc) -> - MapSrc = couch_util:get_value(<<"map">>, MRFuns), - RedSrc = couch_util:get_value(<<"reduce">>, MRFuns, null), - {ViewOptions} = couch_util:get_value(<<"options">>, MRFuns, {[]}), - View = - case dict:find({MapSrc, ViewOptions}, DictBySrcAcc) of - {ok, View0} -> View0; - error -> #view{def=MapSrc, options=ViewOptions} % create new view object - end, - View2 = - if RedSrc == null -> - View#view{map_names=[Name|View#view.map_names]}; - true -> - View#view{reduce_funs=[{Name,RedSrc}|View#view.reduce_funs]} - end, - dict:store({MapSrc, ViewOptions}, View2, DictBySrcAcc) + case couch_util:get_value(<<"map">>, MRFuns) of + undefined -> DictBySrcAcc; + MapSrc -> + RedSrc = couch_util:get_value(<<"reduce">>, MRFuns, null), + {ViewOptions} = couch_util:get_value(<<"options">>, MRFuns, {[]}), + View = + case dict:find({MapSrc, ViewOptions}, DictBySrcAcc) of + {ok, View0} -> View0; + error -> #view{def=MapSrc, options=ViewOptions} % create new view object + end, + View2 = + if RedSrc == null -> + View#view{map_names=[Name|View#view.map_names]}; + true -> + View#view{reduce_funs=[{Name,RedSrc}|View#view.reduce_funs]} + end, + dict:store({MapSrc, ViewOptions}, View2, DictBySrcAcc) + end end, dict:new(), RawViews), % number the views {Views, _N} = lists:mapfoldl( fun({_Src, View}, N) -> {View#view{id_num=N},N+1} end, 0, lists:sort(dict:to_list(DictBySrc))), - - #group{ - name = Id, - views = Views, - def_lang = Language, - design_options = DesignOptions, - sig = couch_util:md5(term_to_binary({Views, Language, DesignOptions})) - }. + set_view_sig(#group{name=Id, lib=Lib, views=Views, def_lang=Language, design_options=DesignOptions}). reset_group(DbName, #group{views=Views}=Group) -> - Group#group{ - fd = nil, - dbname = DbName, - query_server = nil, - current_seq = 0, - id_btree = nil, - views = [View#view{btree=nil} || View <- Views] - }. + Views2 = [View#view{btree=nil} || View <- Views], + Group#group{dbname=DbName,fd=nil,query_server=nil,current_seq=0, + id_btree=nil,views=Views2}. reset_file(Fd, DbName, #group{sig=Sig,name=Name} = Group) -> - ?LOG_INFO("Resetting group index \"~s\" in db ~s", [Name, DbName]), + ?LOG_DEBUG("Resetting group index \"~s\" in db ~s", [Name, DbName]), ok = couch_file:truncate(Fd, 0), ok = couch_file:write_header(Fd, {Sig, nil}), init_group(Fd, reset_group(DbName, Group), nil). @@ -566,7 +593,7 @@ init_group(Fd, #group{dbname=DbName, views=Views}=Group, nil) -> case couch_db:open(DbName, []) of {ok, Db} -> PurgeSeq = try couch_db:get_purge_seq(Db) after couch_db:close(Db) end, - Header = #index_header{purge_seq=PurgeSeq, view_states=[nil || _ <- Views]}, + Header = #index_header{purge_seq=PurgeSeq, view_states=[{nil, 0, 0} || _ <- Views]}, init_group(Fd, Group, Header); {not_found, no_db_file} -> ?LOG_ERROR("~p no_db_file ~p", [?MODULE, DbName]), @@ -575,9 +602,14 @@ init_group(Fd, #group{dbname=DbName, views=Views}=Group, nil) -> init_group(Fd, #group{def_lang=Lang,views=Views}=Group, IndexHeader) -> #index_header{seq=Seq, purge_seq=PurgeSeq, id_btree_state=IdBtreeState, view_states=ViewStates} = IndexHeader, + StateUpdate = fun + ({_, _, _}=State) -> State; + (State) -> {State, 0, 0} + end, + ViewStates2 = lists:map(StateUpdate, ViewStates), {ok, IdBtree} = couch_btree:open(IdBtreeState, Fd), Views2 = lists:zipwith( - fun(BtreeState, #view{reduce_funs=RedFuns,options=Options}=View) -> + fun({BTState, USeq, PSeq}, #view{reduce_funs=RedFuns,options=Options}=View) -> FunSrcs = [FunSrc || {_Name, FunSrc} <- RedFuns], ReduceFun = fun(reduce, KVs) -> @@ -601,11 +633,12 @@ init_group(Fd, #group{def_lang=Lang,views=Views}=Group, IndexHeader) -> <<"raw">> -> Less = fun(A,B) -> A < B end end, - {ok, Btree} = couch_btree:open(BtreeState, Fd, [{less, Less}, - {reduce, ReduceFun}]), - View#view{btree=Btree} + {ok, Btree} = couch_btree:open(BTState, Fd, + [{less, Less}, {reduce, ReduceFun}] + ), + View#view{btree=Btree, update_seq=USeq, purge_seq=PSeq} end, - ViewStates, Views), + ViewStates2, Views), Group#group{fd=Fd, current_seq=Seq, purge_seq=PurgeSeq, id_btree=IdBtree, views=Views2}. diff --git a/apps/couch/src/couch_view_updater.erl b/apps/couch/src/couch_view_updater.erl index a07e5dd3..90cb20d4 100644 --- a/apps/couch/src/couch_view_updater.erl +++ b/apps/couch/src/couch_view_updater.erl @@ -39,8 +39,10 @@ update(Owner, Group) -> couch_task_status:update(<<"Resetting view index due to lost purge entries.">>), exit(reset) end, - {ok, MapQueue} = couch_work_queue:new(100000, 500), - {ok, WriteQueue} = couch_work_queue:new(100000, 500), + {ok, MapQueue} = couch_work_queue:new( + [{max_size, 100000}, {max_items, 500}]), + {ok, WriteQueue} = couch_work_queue:new( + [{max_size, 100000}, {max_items, 500}]), Self = self(), ViewEmptyKVs = [{View, []} || View <- Group2#group.views], spawn_link(?MODULE, do_maps, [Group, MapQueue, WriteQueue, ViewEmptyKVs]), @@ -92,19 +94,25 @@ purge_index(Db, #group{views=Views, id_btree=IdBtree}=Group) -> end, dict:new(), Lookups), % Now remove the values from the btrees + PurgeSeq = couch_db:get_purge_seq(Db), Views2 = lists:map( fun(#view{id_num=Num,btree=Btree}=View) -> case dict:find(Num, ViewKeysToRemoveDict) of {ok, RemoveKeys} -> - {ok, Btree2} = couch_btree:add_remove(Btree, [], RemoveKeys), - View#view{btree=Btree2}; + {ok, ViewBtree2} = couch_btree:add_remove(Btree, [], RemoveKeys), + case ViewBtree2 =/= Btree of + true -> + View#view{btree=ViewBtree2, purge_seq=PurgeSeq}; + _ -> + View#view{btree=ViewBtree2} + end; error -> % no keys to remove in this view View end end, Views), Group#group{id_btree=IdBtree2, views=Views2, - purge_seq=couch_db:get_purge_seq(Db)}. + purge_seq=PurgeSeq}. -spec load_doc(#db{}, #doc_info{}, pid(), [atom()], boolean()) -> ok. load_doc(Db, DI, MapQueue, DocOpts, IncludeDesign) -> @@ -227,12 +235,12 @@ view_insert_doc_query_results(#doc{id=DocId}=Doc, [ResultKVs|RestResults], [{Vie -spec view_compute(#group{}, [#doc{}]) -> {#group{}, any()}. view_compute(Group, []) -> {Group, []}; -view_compute(#group{def_lang=DefLang, query_server=QueryServerIn}=Group, Docs) -> +view_compute(#group{def_lang=DefLang, lib=Lib, query_server=QueryServerIn}=Group, Docs) -> {ok, QueryServer} = case QueryServerIn of nil -> % doc map not started Definitions = [View#view.def || View <- Group#group.views], - couch_query_servers:start_doc_map(DefLang, Definitions); + couch_query_servers:start_doc_map(DefLang, Definitions, Lib); _ -> {ok, QueryServerIn} end, @@ -270,7 +278,12 @@ write_changes(Group, ViewKeyValuesToAdd, DocIdViewIdKeys, NewSeq, InitialBuild) Views2 = lists:zipwith(fun(View, {_View, AddKeyValues}) -> KeysToRemove = couch_util:dict_find(View#view.id_num, KeysToRemoveByView, []), {ok, ViewBtree2} = couch_btree:add_remove(View#view.btree, AddKeyValues, KeysToRemove), - View#view{btree = ViewBtree2} + case ViewBtree2 =/= View#view.btree of + true -> + View#view{btree=ViewBtree2, update_seq=NewSeq}; + _ -> + View#view{btree=ViewBtree2} + end end, Group#group.views, ViewKeyValuesToAdd), Group#group{views=Views2, current_seq=NewSeq, id_btree=IdBtree2}. diff --git a/apps/couch/src/couch_work_queue.erl b/apps/couch/src/couch_work_queue.erl index decfcad8..13ec7335 100644 --- a/apps/couch/src/couch_work_queue.erl +++ b/apps/couch/src/couch_work_queue.erl @@ -13,99 +13,139 @@ -module(couch_work_queue). -behaviour(gen_server). --export([new/2,queue/2,dequeue/1,dequeue/2,close/1]). --export([init/1, terminate/2, handle_call/3, handle_cast/2, code_change/3, handle_info/2]). +% public API +-export([new/1, queue/2, dequeue/1, dequeue/2, close/1]). + +% gen_server callbacks +-export([init/1, terminate/2]). +-export([handle_call/3, handle_cast/2, code_change/3, handle_info/2]). -record(q, { - queue=queue:new(), - blocked=[], + queue = queue:new(), + blocked = [], max_size, max_items, - items=0, - size=0, - work_waiter=nil, - close_on_dequeue=false + items = 0, + size = 0, + work_waiters = [], + close_on_dequeue = false, + multi_workers = false }). -new(MaxSize, MaxItems) -> - gen_server:start_link(couch_work_queue, {MaxSize, MaxItems}, []). + +new(Options) -> + gen_server:start_link(couch_work_queue, Options, []). + queue(Wq, Item) -> gen_server:call(Wq, {queue, Item}, infinity). + dequeue(Wq) -> dequeue(Wq, all). + dequeue(Wq, MaxItems) -> - try gen_server:call(Wq, {dequeue, MaxItems}, infinity) + try + gen_server:call(Wq, {dequeue, MaxItems}, infinity) catch _:_ -> closed end. + close(Wq) -> gen_server:cast(Wq, close). -init({MaxSize,MaxItems}) -> - {ok, #q{max_size=MaxSize, max_items=MaxItems}}. +init(Options) -> + Q = #q{ + max_size = couch_util:get_value(max_size, Options), + max_items = couch_util:get_value(max_items, Options), + multi_workers = couch_util:get_value(multi_workers, Options, false) + }, + {ok, Q}. + + +terminate(_Reason, #q{work_waiters=Workers}) -> + lists:foreach(fun({W, _}) -> gen_server:reply(W, closed) end, Workers). -terminate(_Reason, #q{work_waiter=nil}) -> - ok; -terminate(_Reason, #q{work_waiter={WWFrom, _}}) -> - gen_server:reply(WWFrom, closed). -handle_call({queue, Item}, From, #q{work_waiter=nil}=Q0) -> - Q = Q0#q{size=Q0#q.size + byte_size(term_to_binary(Item)), - items=Q0#q.items + 1, - queue=queue:in(Item, Q0#q.queue)}, +handle_call({queue, Item}, From, #q{work_waiters = []} = Q0) -> + Q = Q0#q{size = Q0#q.size + byte_size(term_to_binary(Item)), + items = Q0#q.items + 1, + queue = queue:in(Item, Q0#q.queue)}, case (Q#q.size >= Q#q.max_size) orelse (Q#q.items >= Q#q.max_items) of true -> - {noreply, Q#q{blocked=[From | Q#q.blocked]}}; + {noreply, Q#q{blocked = [From | Q#q.blocked]}}; false -> {reply, ok, Q} end; -handle_call({queue, Item}, _From, #q{work_waiter={WWFrom, _Max}}=Q) -> - gen_server:reply(WWFrom, {ok, [Item]}), - {reply, ok, Q#q{work_waiter=nil}}; -handle_call({dequeue, _Max}, _From, #q{work_waiter=WW}) when WW /= nil -> - exit("Only one caller allowed to wait for work at a time"); -handle_call({dequeue, Max}, From, #q{items=0}=Q) -> - {noreply, Q#q{work_waiter={From, Max}}}; -handle_call({dequeue, Max}, _From, #q{queue=Queue, max_size=MaxSize, - max_items=MaxItems, items=Items,close_on_dequeue=Close}=Q) -> - if Max >= Items orelse Max == all -> - [gen_server:reply(From, ok) || From <- Q#q.blocked], - Q2 = #q{max_size=MaxSize, max_items=MaxItems}, - if Close -> - {stop, normal, {ok, queue:to_list(Queue)}, Q2}; - true -> - {reply, {ok, queue:to_list(Queue)}, Q2} - end; + +handle_call({queue, Item}, _From, #q{work_waiters = [{W, _Max} | Rest]} = Q) -> + gen_server:reply(W, {ok, [Item]}), + {reply, ok, Q#q{work_waiters = Rest}}; + +handle_call({dequeue, Max}, From, Q) -> + #q{work_waiters = Workers, multi_workers = Multi, items = Count} = Q, + case {Workers, Multi} of + {[_ | _], false} -> + exit("Only one caller allowed to wait for this work at a time"); + {[_ | _], true} -> + {noreply, Q#q{work_waiters=Workers ++ [{From, Max}]}}; + _ -> + case Count of + 0 -> + {noreply, Q#q{work_waiters=Workers ++ [{From, Max}]}}; + C when C > 0 -> + deliver_queue_items(Max, Q) + end + end. + + +deliver_queue_items(Max, Q) -> + #q{ + queue = Queue, + items = Count, + close_on_dequeue = Close, + blocked = Blocked + } = Q, + case (Max =:= all) orelse (Max >= Count) of + false -> + {Items, Queue2, Blocked2} = dequeue_items(Max, Queue, Blocked, []), + Q2 = Q#q{items = Count - Max, blocked = Blocked2, queue = Queue2}, + {reply, {ok, Items}, Q2}; true -> - {DequeuedItems, Queue2, Blocked2} = - dequeue_items(Max, Queue, Q#q.blocked, []), - {reply, {ok, DequeuedItems}, - Q#q{items=Items-Max,blocked=Blocked2,queue=Queue2}} + lists:foreach(fun(F) -> gen_server:reply(F, ok) end, Blocked), + Q2 = Q#q{items = 0, size = 0, blocked = [], queue = queue:new()}, + case Close of + false -> + {reply, {ok, queue:to_list(Queue)}, Q2}; + true -> + {stop, normal, {ok, queue:to_list(Queue)}, Q2} + end end. + dequeue_items(0, Queue, Blocked, DequeuedAcc) -> {lists:reverse(DequeuedAcc), Queue, Blocked}; + dequeue_items(NumItems, Queue, Blocked, DequeuedAcc) -> {{value, Item}, Queue2} = queue:out(Queue), case Blocked of [] -> Blocked2 = Blocked; - [From|Blocked2] -> + [From | Blocked2] -> gen_server:reply(From, ok) end, - dequeue_items(NumItems-1, Queue2, Blocked2, [Item | DequeuedAcc]). + dequeue_items(NumItems - 1, Queue2, Blocked2, [Item | DequeuedAcc]). -handle_cast(close, #q{items=0}=Q) -> +handle_cast(close, #q{items = 0} = Q) -> {stop, normal, Q}; + handle_cast(close, Q) -> - {noreply, Q#q{close_on_dequeue=true}}. + {noreply, Q#q{close_on_dequeue = true}}. code_change(_OldVsn, State, _Extra) -> diff --git a/apps/couch/src/test_util.erl b/apps/couch/src/test_util.erl index e43338e7..55b95139 100644 --- a/apps/couch/src/test_util.erl +++ b/apps/couch/src/test_util.erl @@ -13,7 +13,7 @@ -module(test_util). -export([init_code_path/0]). --export([source_file/1, build_file/1]). +-export([source_file/1, build_file/1, config_files/0]). init_code_path() -> code:load_abs("apps/couch/test/etap/etap"). @@ -22,4 +22,12 @@ source_file(Name) -> filename:join(["apps/couch", Name]). build_file(Name) -> - filename:join(["apps/couch", Name]). + filename:join(["rel/overlay", Name]). + +config_files() -> + [ + build_file("etc/default.ini"), + build_file("etc/local.ini"), + source_file("test/etap/random_port.ini") + ]. + |