diff options
Diffstat (limited to 'apps/couch/src')
22 files changed, 534 insertions, 396 deletions
diff --git a/apps/couch/src/couch_auth_cache.erl b/apps/couch/src/couch_auth_cache.erl index 0264a69d..8b911543 100644 --- a/apps/couch/src/couch_auth_cache.erl +++ b/apps/couch/src/couch_auth_cache.erl @@ -135,6 +135,7 @@ handle_db_event({Event, DbName}) -> case Event of deleted -> gen_server:call(?MODULE, auth_db_deleted, infinity); created -> gen_server:call(?MODULE, auth_db_created, infinity); + compacted -> gen_server:call(?MODULE, auth_db_compacted, infinity); _Else -> ok end; false -> @@ -158,6 +159,14 @@ handle_call(auth_db_created, _From, State) -> true = ets:insert(?STATE, {auth_db, open_auth_db()}), {reply, ok, NewState}; +handle_call(auth_db_compacted, _From, State) -> + exec_if_auth_db( + fun(AuthDb) -> + true = ets:insert(?STATE, {auth_db, reopen_auth_db(AuthDb)}) + end + ), + {reply, ok, State}; + handle_call({new_max_cache_size, NewSize}, _From, State) -> case NewSize >= State#state.cache_size of true -> @@ -175,7 +184,7 @@ handle_call({new_max_cache_size, NewSize}, _From, State) -> end, NewState = State#state{ max_cache_size = NewSize, - cache_size = erlang:min(NewSize, State#state.cache_size) + cache_size = lists:min([NewSize, State#state.cache_size]) }, {reply, ok, NewState}; @@ -338,7 +347,7 @@ cache_needs_refresh() -> reopen_auth_db(AuthDb) -> - case (catch gen_server:call(AuthDb#db.main_pid, get_db, infinity)) of + case (catch couch_db:reopen(AuthDb)) of {ok, AuthDb2} -> AuthDb2; _ -> diff --git a/apps/couch/src/couch_db.erl b/apps/couch/src/couch_db.erl index 0e42980e..7293b9bb 100644 --- a/apps/couch/src/couch_db.erl +++ b/apps/couch/src/couch_db.erl @@ -25,6 +25,7 @@ -export([set_security/2,get_security/1]). -export([changes_since/5,changes_since/6,read_doc/2,new_revid/1]). -export([check_is_admin/1, check_is_reader/1, get_doc_count/1, load_validation_funs/1]). +-export([reopen/1]). -include("couch_db.hrl"). @@ -84,6 +85,17 @@ open(DbName, Options) -> Else -> Else end. +reopen(#db{main_pid = Pid, fd = Fd, fd_monitor = OldRef, user_ctx = UserCtx}) -> + {ok, #db{fd = NewFd} = NewDb} = gen_server:call(Pid, get_db, infinity), + case NewFd =:= Fd of + true -> + {ok, NewDb#db{user_ctx = UserCtx}}; + false -> + erlang:demonitor(OldRef), + NewRef = erlang:monitor(process, NewFd), + {ok, NewDb#db{user_ctx = UserCtx, fd_monitor = NewRef}} + end. + ensure_full_commit(#db{main_pid=Pid, instance_start_time=StartTime}) -> ok = gen_server:call(Pid, full_commit, infinity), {ok, StartTime}. @@ -584,7 +596,8 @@ prep_and_validate_replicated_updates(Db, [Bucket|RestBuckets], [OldInfo|RestOldI {ok, #full_doc_info{rev_tree=OldTree}} -> NewRevTree = lists:foldl( fun(NewDoc, AccTree) -> - {NewTree, _} = couch_key_tree:merge(AccTree, [couch_db:doc_to_tree(NewDoc)]), + {NewTree, _} = couch_key_tree:merge(AccTree, + couch_db:doc_to_tree(NewDoc), Db#db.revs_limit), NewTree end, OldTree, Bucket), @@ -845,11 +858,12 @@ flush_att(Fd, #att{data={Fd0, _}}=Att) when Fd0 == Fd -> % already written to our file, nothing to write Att; -flush_att(Fd, #att{data={OtherFd,StreamPointer}, md5=InMd5}=Att) -> +flush_att(Fd, #att{data={OtherFd,StreamPointer}, md5=InMd5, + disk_len=InDiskLen} = Att) -> {NewStreamData, Len, _IdentityLen, Md5, IdentityMd5} = couch_stream:copy_to_new_stream(OtherFd, StreamPointer, Fd), check_md5(IdentityMd5, InMd5), - Att#att{data={Fd, NewStreamData}, md5=Md5, att_len=Len, disk_len=Len}; + Att#att{data={Fd, NewStreamData}, md5=Md5, att_len=Len, disk_len=InDiskLen}; flush_att(Fd, #att{data=Data}=Att) when is_binary(Data) -> with_stream(Fd, Att, fun(OutputStream) -> @@ -947,7 +961,7 @@ with_stream(Fd, #att{md5=InMd5,type=Type,encoding=Enc}=Att, Fun) -> write_streamed_attachment(_Stream, _F, 0) -> ok; -write_streamed_attachment(Stream, F, LenLeft) -> +write_streamed_attachment(Stream, F, LenLeft) when LenLeft > 0 -> Bin = F(), ok = couch_stream:write(Stream, Bin), write_streamed_attachment(Stream, F, LenLeft - size(Bin)). diff --git a/apps/couch/src/couch_db_updater.erl b/apps/couch/src/couch_db_updater.erl index ab078caf..835d188c 100644 --- a/apps/couch/src/couch_db_updater.erl +++ b/apps/couch/src/couch_db_updater.erl @@ -204,9 +204,11 @@ handle_cast({compact_done, CompactFilepath}, #db{filepath=Filepath}=Db) -> couch_file:delete(RootDir, Filepath), ok = file:rename(CompactFilepath, Filepath), close_db(Db), - ok = gen_server:call(couch_server, {db_updated, NewDb2}, infinity), + NewDb3 = refresh_validate_doc_funs(NewDb2), + ok = gen_server:call(couch_server, {db_updated, NewDb3}, infinity), + couch_db_update_notifier:notify({compacted, NewDb3#db.name}), ?LOG_INFO("Compaction for db \"~s\" completed.", [Db#db.name]), - {noreply, NewDb2#db{compactor_pid=nil}}; + {noreply, NewDb3#db{compactor_pid=nil}}; false -> ?LOG_INFO("Compaction for ~s still behind main file " "(update seq=~p. compact update seq=~p). Retrying.", @@ -518,16 +520,17 @@ send_result(Client, Id, OriginalRevs, NewResult) -> % used to send a result to the client catch(Client ! {result, self(), {{Id, OriginalRevs}, NewResult}}). -merge_rev_trees(_MergeConflicts, [], [], AccNewInfos, AccRemoveSeqs, AccSeq) -> +merge_rev_trees(_Limit, _Merge, [], [], AccNewInfos, AccRemoveSeqs, AccSeq) -> {ok, lists:reverse(AccNewInfos), AccRemoveSeqs, AccSeq}; -merge_rev_trees(MergeConflicts, [NewDocs|RestDocsList], +merge_rev_trees(Limit, MergeConflicts, [NewDocs|RestDocsList], [OldDocInfo|RestOldInfo], AccNewInfos, AccRemoveSeqs, AccSeq) -> #full_doc_info{id=Id,rev_tree=OldTree,deleted=OldDeleted,update_seq=OldSeq} = OldDocInfo, NewRevTree = lists:foldl( fun({Client, #doc{revs={Pos,[_Rev|PrevRevs]}}=NewDoc}, AccTree) -> if not MergeConflicts -> - case couch_key_tree:merge(AccTree, [couch_db:doc_to_tree(NewDoc)]) of + case couch_key_tree:merge(AccTree, couch_db:doc_to_tree(NewDoc), + Limit) of {_NewTree, conflicts} when (not OldDeleted) -> send_result(Client, Id, {Pos-1,PrevRevs}, conflict), AccTree; @@ -558,7 +561,7 @@ merge_rev_trees(MergeConflicts, [NewDocs|RestDocsList], NewDoc#doc{revs={OldPos, [OldRev]}}), NewDoc2 = NewDoc#doc{revs={OldPos + 1, [NewRevId, OldRev]}}, {NewTree2, _} = couch_key_tree:merge(AccTree, - [couch_db:doc_to_tree(NewDoc2)]), + couch_db:doc_to_tree(NewDoc2), Limit), % we changed the rev id, this tells the caller we did send_result(Client, Id, {Pos-1,PrevRevs}, {ok, {OldPos + 1, NewRevId}}), @@ -572,15 +575,15 @@ merge_rev_trees(MergeConflicts, [NewDocs|RestDocsList], end; true -> {NewTree, _} = couch_key_tree:merge(AccTree, - [couch_db:doc_to_tree(NewDoc)]), + couch_db:doc_to_tree(NewDoc), Limit), NewTree end end, OldTree, NewDocs), if NewRevTree == OldTree -> % nothing changed - merge_rev_trees(MergeConflicts, RestDocsList, RestOldInfo, AccNewInfos, - AccRemoveSeqs, AccSeq); + merge_rev_trees(Limit, MergeConflicts, RestDocsList, RestOldInfo, + AccNewInfos, AccRemoveSeqs, AccSeq); true -> % we have updated the document, give it a new seq # NewInfo = #full_doc_info{id=Id,update_seq=AccSeq+1,rev_tree=NewRevTree}, @@ -588,8 +591,8 @@ merge_rev_trees(MergeConflicts, [NewDocs|RestDocsList], 0 -> AccRemoveSeqs; _ -> [OldSeq | AccRemoveSeqs] end, - merge_rev_trees(MergeConflicts, RestDocsList, RestOldInfo, - [NewInfo|AccNewInfos], RemoveSeqs, AccSeq+1) + merge_rev_trees(Limit, MergeConflicts, RestDocsList, RestOldInfo, + [NewInfo|AccNewInfos], RemoveSeqs, AccSeq+1) end. @@ -609,7 +612,8 @@ update_docs_int(Db, DocsList, NonRepDocs, MergeConflicts, FullCommit) -> #db{ id_tree = DocInfoByIdBTree, seq_tree = DocInfoBySeqBTree, - update_seq = LastSeq + update_seq = LastSeq, + revs_limit = RevsLimit } = Db, Ids = [Id || [{_Client, #doc{id=Id}}|_] <- DocsList], % lookup up the old documents, if they exist. @@ -622,11 +626,9 @@ update_docs_int(Db, DocsList, NonRepDocs, MergeConflicts, FullCommit) -> end, Ids, OldDocLookups), % Merge the new docs into the revision trees. - {ok, NewDocInfos0, RemoveSeqs, NewSeq} = merge_rev_trees( + {ok, NewFullDocInfos, RemoveSeqs, NewSeq} = merge_rev_trees(RevsLimit, MergeConflicts, DocsList, OldDocInfos, [], [], LastSeq), - NewFullDocInfos = stem_full_doc_infos(Db, NewDocInfos0), - % All documents are now ready to write. {ok, Db2} = update_local_docs(Db, NonRepDocs), @@ -794,15 +796,6 @@ copy_doc_attachments(#db{fd=SrcFd}=SrcDb, {Pos,_RevId}, SrcSp, DestFd) -> end, BinInfos), {BodyData, NewBinInfos}. -copy_rev_tree_attachments(SrcDb, DestFd, Tree) -> - couch_key_tree:map( - fun(Rev, {IsDel, Sp, Seq}, leaf) -> - DocBody = copy_doc_attachments(SrcDb, Rev, Sp, DestFd), - {IsDel, DocBody, Seq}; - (_, _, branch) -> - ?REV_MISSING - end, Tree). - merge_lookups(Infos, []) -> Infos; merge_lookups([], _) -> @@ -816,20 +809,19 @@ copy_docs(Db, #db{fd=DestFd}=NewDb, MixedInfos, Retry) -> % lookup any necessary full_doc_infos DocInfoIds = [Id || #doc_info{id=Id} <- MixedInfos], LookupResults = couch_btree:lookup(Db#db.id_tree, DocInfoIds), - Infos = merge_lookups(MixedInfos, LookupResults), + % COUCHDB-968, make sure we prune duplicates during compaction + Infos = lists:usort(fun(#full_doc_info{id=A}, #full_doc_info{id=B}) -> + A =< B + end, merge_lookups(MixedInfos, LookupResults)), - % write out the attachments - NewInfos0 = [Info#full_doc_info{rev_tree=copy_rev_tree_attachments(Db, - DestFd, RevTree)} || #full_doc_info{rev_tree=RevTree}=Info <- Infos], - - % write out the docs - % we do this in 2 stages so the docs are written out contiguously, making - % view indexing and replication faster. - NewInfos1 = [Info#full_doc_info{rev_tree=couch_key_tree:map_leafs( - fun(_Key, {IsDel, DocBody, Seq}) -> + NewInfos1 = [Info#full_doc_info{rev_tree=couch_key_tree:map( + fun(Rev, {IsDel, Sp, Seq}, leaf) -> + DocBody = copy_doc_attachments(Db, Rev, Sp, DestFd), {ok, Pos} = couch_file:append_term_md5(DestFd, DocBody), - {IsDel, Pos, Seq} - end, RevTree)} || #full_doc_info{rev_tree=RevTree}=Info <- NewInfos0], + {IsDel, Pos, Seq}; + (_, _, branch) -> + ?REV_MISSING + end, RevTree)} || #full_doc_info{rev_tree=RevTree}=Info <- Infos], NewInfos = stem_full_doc_infos(Db, NewInfos1), RemoveSeqs = @@ -900,14 +892,19 @@ copy_compact(Db, NewDb0, Retry) -> commit_data(NewDb4#db{update_seq=Db#db.update_seq}). -start_copy_compact(#db{name=Name,filepath=Filepath}=Db) -> +start_copy_compact(#db{name=Name,filepath=Filepath,header=#db_header{purge_seq=PurgeSeq}}=Db) -> CompactFile = Filepath ++ ".compact", ?LOG_DEBUG("Compaction process spawned for db \"~s\"", [Name]), case couch_file:open(CompactFile) of {ok, Fd} -> couch_task_status:add_task(<<"Database Compaction">>, <<Name/binary, " retry">>, <<"Starting">>), Retry = true, - {ok, Header} = couch_file:read_header(Fd); + case couch_file:read_header(Fd) of + {ok, Header} -> + ok; + no_valid_header -> + ok = couch_file:write_header(Fd, Header=#db_header{}) + end; {error, enoent} -> couch_task_status:add_task(<<"Database Compaction">>, Name, <<"Starting">>), {ok, Fd} = couch_file:open(CompactFile, [create]), @@ -915,8 +912,16 @@ start_copy_compact(#db{name=Name,filepath=Filepath}=Db) -> ok = couch_file:write_header(Fd, Header=#db_header{}) end, NewDb = init_db(Name, CompactFile, Fd, Header), + NewDb2 = if PurgeSeq > 0 -> + {ok, PurgedIdsRevs} = couch_db:get_last_purged(Db), + {ok, Pointer} = couch_file:append_term(Fd, PurgedIdsRevs), + NewDb#db{header=Header#db_header{purge_seq=PurgeSeq, purged_docs=Pointer}}; + true -> + NewDb + end, unlink(Fd), - NewDb2 = copy_compact(Db, NewDb, Retry), - close_db(NewDb2), + + NewDb3 = copy_compact(Db, NewDb2, Retry), + close_db(NewDb3), gen_server:cast(Db#db.main_pid, {compact_done, CompactFile}). diff --git a/apps/couch/src/couch_file.erl b/apps/couch/src/couch_file.erl index 9c06a44e..3e4f29fe 100644 --- a/apps/couch/src/couch_file.erl +++ b/apps/couch/src/couch_file.erl @@ -120,7 +120,19 @@ pread_binary(Fd, Pos) -> pread_iolist(Fd, Pos) -> - gen_server:call(Fd, {pread_iolist, Pos}, infinity). + case gen_server:call(Fd, {pread_iolist, Pos}, infinity) of + {ok, IoList, <<>>} -> + {ok, IoList}; + {ok, IoList, Md5} -> + case couch_util:md5(IoList) of + Md5 -> + {ok, IoList}; + _ -> + exit({file_corruption, <<"file corruption">>}) + end; + Error -> + Error + end. %%---------------------------------------------------------------------- %% Purpose: The length of a file, in bytes. @@ -287,15 +299,10 @@ handle_call({pread_iolist, Pos}, _From, File) -> <<1:1/integer,Len:31/integer>> -> % an MD5-prefixed term {Md5AndIoList, _} = read_raw_iolist_int(File, NextPos, Len+16), {Md5, IoList} = extract_md5(Md5AndIoList), - case couch_util:md5(IoList) of - Md5 -> - {reply, {ok, IoList}, File}; - _ -> - {stop, file_corruption, {error,file_corruption}, File} - end; + {reply, {ok, IoList, Md5}, File}; <<0:1/integer,Len:31/integer>> -> {Iolist, _} = read_raw_iolist_int(File, NextPos, Len), - {reply, {ok, Iolist}, File} + {reply, {ok, Iolist, <<>>}, File} end; handle_call({pread, Pos, Bytes}, _From, #file{fd=Fd,tail_append_begin=TailAppendBegin}=File) -> {ok, Bin} = file:pread(Fd, Pos, Bytes), diff --git a/apps/couch/src/couch_httpd.erl b/apps/couch/src/couch_httpd.erl index 2b952656..0d9abde6 100644 --- a/apps/couch/src/couch_httpd.erl +++ b/apps/couch/src/couch_httpd.erl @@ -769,7 +769,13 @@ error_headers(#httpd{mochi_req=MochiReq}=Req, Code, ErrorStr, ReasonStr) -> {Code, []}; match -> AuthRedirectBin = ?l2b(AuthRedirect), - UrlReturn = ?l2b(couch_util:url_encode(MochiReq:get(path))), + % Redirect to the path the user requested, not + % the one that is used internally. + UrlReturnRaw = case MochiReq:get_header_value("x-couchdb-vhost-path") of + undefined -> MochiReq:get(path); + VHostPath -> VHostPath + end, + UrlReturn = ?l2b(couch_util:url_encode(UrlReturnRaw)), UrlReason = ?l2b(couch_util:url_encode(ReasonStr)), {302, [{"Location", couch_httpd:absolute_uri(Req, <<AuthRedirectBin/binary,"?return=",UrlReturn/binary,"&reason=",UrlReason/binary>>)}]} end diff --git a/apps/couch/src/couch_httpd_db.erl b/apps/couch/src/couch_httpd_db.erl index cf4e2120..217a2d03 100644 --- a/apps/couch/src/couch_httpd_db.erl +++ b/apps/couch/src/couch_httpd_db.erl @@ -576,7 +576,7 @@ db_doc_req(#httpd{method='GET'}=Req, Db, DocId) -> {ok, Results} = couch_db:open_doc_revs(Db, DocId, Revs, Options), AcceptedTypes = case couch_httpd:header_value(Req, "Accept") of undefined -> []; - AcceptHeader -> string:tokens(AcceptHeader, "; ") + AcceptHeader -> string:tokens(AcceptHeader, ", ") end, case lists:member("multipart/mixed", AcceptedTypes) of false -> @@ -736,34 +736,34 @@ send_doc_efficiently(Req, #doc{atts=Atts}=Doc, Headers, Options) -> JsonBytes = ?JSON_ENCODE(couch_doc:to_json_obj(Doc, [attachments, follows|Options])), {ContentType, Len} = couch_doc:len_doc_to_multi_part_stream( - Boundary,JsonBytes, Atts,false), + Boundary,JsonBytes, Atts, true), CType = {<<"Content-Type">>, ContentType}, {ok, Resp} = start_response_length(Req, 200, [CType|Headers], Len), couch_doc:doc_to_multi_part_stream(Boundary,JsonBytes,Atts, - fun(Data) -> couch_httpd:send(Resp, Data) end, false) + fun(Data) -> couch_httpd:send(Resp, Data) end, true) end; false -> send_json(Req, 200, Headers, couch_doc:to_json_obj(Doc, Options)) end. -send_docs_multipart(Req, Results, Options) -> +send_docs_multipart(Req, Results, Options1) -> OuterBoundary = couch_uuids:random(), InnerBoundary = couch_uuids:random(), + Options = [attachments, follows, att_encoding_info | Options1], CType = {"Content-Type", "multipart/mixed; boundary=\"" ++ ?b2l(OuterBoundary) ++ "\""}, {ok, Resp} = start_chunked_response(Req, 200, [CType]), couch_httpd:send_chunk(Resp, <<"--", OuterBoundary/binary>>), lists:foreach( fun({ok, #doc{atts=Atts}=Doc}) -> - JsonBytes = ?JSON_ENCODE(couch_doc:to_json_obj(Doc, - [attachments,follows|Options])), + JsonBytes = ?JSON_ENCODE(couch_doc:to_json_obj(Doc, Options)), {ContentType, _Len} = couch_doc:len_doc_to_multi_part_stream( - InnerBoundary, JsonBytes, Atts, false), + InnerBoundary, JsonBytes, Atts, true), couch_httpd:send_chunk(Resp, <<"\r\nContent-Type: ", ContentType/binary, "\r\n\r\n">>), couch_doc:doc_to_multi_part_stream(InnerBoundary, JsonBytes, Atts, fun(Data) -> couch_httpd:send_chunk(Resp, Data) - end, false), + end, true), couch_httpd:send_chunk(Resp, <<"\r\n--", OuterBoundary/binary>>); ({{not_found, missing}, RevId}) -> RevStr = couch_doc:rev_to_str(RevId), @@ -1020,8 +1020,10 @@ db_attachment_req(#httpd{method=Method,mochi_req=MochiReq}=Req, Db, DocId, FileN end end, - #doc{atts=Atts} = Doc, + #doc{atts=Atts, revs = {Pos, Revs}} = Doc, DocEdited = Doc#doc{ + % prune revision list as a workaround for key tree bug (COUCHDB-902) + revs = {Pos, case Revs of [] -> []; [Hd|_] -> [Hd] end}, atts = NewAtt ++ [A || A <- Atts, A#att.name /= FileName] }, {ok, UpdatedRev} = couch_db:update_doc(Db, DocEdited, []), diff --git a/apps/couch/src/couch_httpd_misc_handlers.erl b/apps/couch/src/couch_httpd_misc_handlers.erl index 7b09dccd..7a149d11 100644 --- a/apps/couch/src/couch_httpd_misc_handlers.erl +++ b/apps/couch/src/couch_httpd_misc_handlers.erl @@ -93,10 +93,17 @@ handle_replicate_req(#httpd{method='POST'}=Req) -> {error, not_found} -> send_json(Req, 404, {[{error, not_found}]}); {error, Reason} -> - send_json(Req, 500, {[{error, Reason}]}) + try + send_json(Req, 500, {[{error, Reason}]}) + catch + exit:{json_encode, _} -> + send_json(Req, 500, {[{error, couch_util:to_binary(Reason)}]}) + end catch throw:{db_not_found, Msg} -> - send_json(Req, 404, {[{error, db_not_found}, {reason, Msg}]}) + send_json(Req, 404, {[{error, db_not_found}, {reason, Msg}]}); + throw:{unauthorized, Msg} -> + send_json(Req, 404, {[{error, unauthorized}, {reason, Msg}]}) end; handle_replicate_req(Req) -> send_method_not_allowed(Req, "POST"). diff --git a/apps/couch/src/couch_httpd_rewrite.erl b/apps/couch/src/couch_httpd_rewrite.erl index ca4ac1f0..6c3d0e3c 100644 --- a/apps/couch/src/couch_httpd_rewrite.erl +++ b/apps/couch/src/couch_httpd_rewrite.erl @@ -126,7 +126,10 @@ handle_rewrite_req(#httpd{ case couch_util:get_value(<<"rewrites">>, Props) of undefined -> couch_httpd:send_error(Req, 404, <<"rewrite_error">>, - <<"Invalid path.">>); + <<"Invalid path.">>); + Bin when is_binary(Bin) -> + couch_httpd:send_error(Req, 400, <<"rewrite_error">>, + <<"Rewrite rules are a String. They must be a JSON Array.">>); Rules -> % create dispatch list from rules DispatchList = [make_rule(Rule) || {Rule} <- Rules], diff --git a/apps/couch/src/couch_httpd_view.erl b/apps/couch/src/couch_httpd_view.erl index e1a0dfad..cb387d1b 100644 --- a/apps/couch/src/couch_httpd_view.erl +++ b/apps/couch/src/couch_httpd_view.erl @@ -365,6 +365,8 @@ validate_view_query(group_level, Value, Args) -> end; validate_view_query(inclusive_end, Value, Args) -> Args#view_query_args{inclusive_end=Value}; +validate_view_query(reduce, false, Args) -> + Args; validate_view_query(reduce, _, Args) -> case Args#view_query_args.view_type of map -> diff --git a/apps/couch/src/couch_js_functions.hrl b/apps/couch/src/couch_js_functions.hrl index 1f314f6e..32573a90 100644 --- a/apps/couch/src/couch_js_functions.hrl +++ b/apps/couch/src/couch_js_functions.hrl @@ -31,7 +31,7 @@ throw({forbidden: 'doc.name is required'}); } - if (!(newDoc.roles && (typeof newDoc.roles.length !== 'undefined'))) { + if (newDoc.roles && !isArray(newDoc.roles)) { throw({forbidden: 'doc.roles must be an array'}); } diff --git a/apps/couch/src/couch_key_tree.erl b/apps/couch/src/couch_key_tree.erl index 4fe09bf3..6701da58 100644 --- a/apps/couch/src/couch_key_tree.erl +++ b/apps/couch/src/couch_key_tree.erl @@ -12,104 +12,107 @@ -module(couch_key_tree). --export([merge/2, find_missing/2, get_key_leafs/2, get_full_key_paths/2, get/2]). +-export([merge/3, find_missing/2, get_key_leafs/2, get_full_key_paths/2, get/2]). -export([map/2, get_all_leafs/1, count_leafs/1, remove_leafs/2, get_all_leafs_full/1,stem/2,map_leafs/2]). -% a key tree looks like this: -% Tree -> [] or [{Key, Value, ChildTree} | SiblingTree] -% ChildTree -> Tree -% SiblingTree -> [] or [{SiblingKey, Value, Tree} | Tree] -% And each Key < SiblingKey - +% Tree::term() is really a tree(), but we don't want to require R13B04 yet +-type branch() :: {Key::term(), Value::term(), Tree::term()}. +-type path() :: {Start::pos_integer(), branch()}. +-type tree() :: [branch()]. % sorted by key % partial trees arranged by how much they are cut off. -merge(A, B) -> - {Merged, HasConflicts} = - lists:foldl( - fun(InsertTree, {AccTrees, AccConflicts}) -> - {ok, Merged, Conflicts} = merge_one(AccTrees, InsertTree, [], false), - {Merged, Conflicts or AccConflicts} - end, - {A, false}, B), - if HasConflicts or - ((length(Merged) =/= length(A)) and (length(Merged) =/= length(B))) -> +-spec merge([path()], path(), pos_integer()) -> {[path()], + conflicts | no_conflicts}. +merge(Paths, Path, Depth) -> + {Merged, Conflicts} = merge(Paths, Path), + {stem(Merged, Depth), Conflicts}. + +-spec merge([path()], path()) -> {[path()], conflicts | no_conflicts}. +merge(Paths, Path) -> + {ok, Merged, HasConflicts} = merge_one(Paths, Path, [], false), + if HasConflicts -> + Conflicts = conflicts; + (length(Merged) =/= length(Paths)) and (length(Merged) =/= 1) -> Conflicts = conflicts; true -> Conflicts = no_conflicts end, {lists:sort(Merged), Conflicts}. +-spec merge_one(Original::[path()], Inserted::path(), [path()], bool()) -> + {ok, Merged::[path()], NewConflicts::bool()}. merge_one([], Insert, OutAcc, ConflictsAcc) -> {ok, [Insert | OutAcc], ConflictsAcc}; -merge_one([{Start, Tree}|Rest], {StartInsert, TreeInsert}, OutAcc, ConflictsAcc) -> - if Start =< StartInsert -> - StartA = Start, - StartB = StartInsert, - TreeA = Tree, - TreeB = TreeInsert; - true -> - StartB = Start, - StartA = StartInsert, - TreeB = Tree, - TreeA = TreeInsert - end, - case merge_at([TreeA], StartB - StartA, TreeB) of - {ok, [CombinedTrees], Conflicts} -> - merge_one(Rest, {StartA, CombinedTrees}, OutAcc, Conflicts or ConflictsAcc); +merge_one([{Start, Tree}|Rest], {StartInsert, TreeInsert}, Acc, HasConflicts) -> + case merge_at([Tree], StartInsert - Start, [TreeInsert]) of + {ok, [Merged], Conflicts} -> + MergedStart = lists:min([Start, StartInsert]), + {ok, Rest ++ [{MergedStart, Merged} | Acc], Conflicts or HasConflicts}; no -> - merge_one(Rest, {StartB, TreeB}, [{StartA, TreeA} | OutAcc], ConflictsAcc) + AccOut = [{Start, Tree} | Acc], + merge_one(Rest, {StartInsert, TreeInsert}, AccOut, HasConflicts) end. +-spec merge_at(tree(), Place::integer(), tree()) -> + {ok, Merged::tree(), HasConflicts::bool()} | no. +merge_at(_Ours, _Place, []) -> + no; merge_at([], _Place, _Insert) -> no; -merge_at([{Key, Value, SubTree}|Sibs], 0, {InsertKey, InsertValue, InsertSubTree}) -> - if Key == InsertKey -> - {Merge, Conflicts} = merge_simple(SubTree, InsertSubTree), - {ok, [{Key, Value, Merge} | Sibs], Conflicts}; - true -> - case merge_at(Sibs, 0, {InsertKey, InsertValue, InsertSubTree}) of - {ok, Merged, Conflicts} -> - {ok, [{Key, Value, SubTree} | Merged], Conflicts}; - no -> - no - end - end; -merge_at([{Key, Value, SubTree}|Sibs], Place, Insert) -> - case merge_at(SubTree, Place - 1,Insert) of +merge_at([{Key, Value, SubTree}|Sibs], Place, InsertTree) when Place > 0 -> + % inserted starts later than committed, need to drill into committed subtree + case merge_at(SubTree, Place - 1, InsertTree) of {ok, Merged, Conflicts} -> {ok, [{Key, Value, Merged} | Sibs], Conflicts}; no -> - case merge_at(Sibs, Place, Insert) of + case merge_at(Sibs, Place, InsertTree) of {ok, Merged, Conflicts} -> {ok, [{Key, Value, SubTree} | Merged], Conflicts}; no -> no end + end; +merge_at(OurTree, Place, [{Key, Value, SubTree}]) when Place < 0 -> + % inserted starts earlier than committed, need to drill into insert subtree + case merge_at(OurTree, Place + 1, SubTree) of + {ok, Merged, Conflicts} -> + {ok, [{Key, Value, Merged}], Conflicts}; + no -> + no + end; +merge_at([{Key, Value, SubTree}|Sibs], 0, [{Key, _Value, InsertSubTree}]) -> + {Merged, Conflicts} = merge_simple(SubTree, InsertSubTree), + {ok, [{Key, Value, Merged} | Sibs], Conflicts}; +merge_at([{OurKey, _, _} | _], 0, [{Key, _, _}]) when OurKey > Key -> + % siblings keys are ordered, no point in continuing + no; +merge_at([Tree | Sibs], 0, InsertTree) -> + case merge_at(Sibs, 0, InsertTree) of + {ok, Merged, Conflicts} -> + {ok, [Tree | Merged], Conflicts}; + no -> + no end. % key tree functions + +-spec merge_simple(tree(), tree()) -> {Merged::tree(), NewConflicts::bool()}. merge_simple([], B) -> {B, false}; merge_simple(A, []) -> {A, false}; -merge_simple([ATree | ANextTree], [BTree | BNextTree]) -> - {AKey, AValue, ASubTree} = ATree, - {BKey, _BValue, BSubTree} = BTree, - if - AKey == BKey -> - %same key - {MergedSubTree, Conflict1} = merge_simple(ASubTree, BSubTree), - {MergedNextTree, Conflict2} = merge_simple(ANextTree, BNextTree), - {[{AKey, AValue, MergedSubTree} | MergedNextTree], Conflict1 or Conflict2}; - AKey < BKey -> - {MTree, _} = merge_simple(ANextTree, [BTree | BNextTree]), - {[ATree | MTree], true}; - true -> - {MTree, _} = merge_simple([ATree | ANextTree], BNextTree), - {[BTree | MTree], true} - end. +merge_simple([{Key, Value, SubA} | NextA], [{Key, _, SubB} | NextB]) -> + {MergedSubTree, Conflict1} = merge_simple(SubA, SubB), + {MergedNextTree, Conflict2} = merge_simple(NextA, NextB), + {[{Key, Value, MergedSubTree} | MergedNextTree], Conflict1 or Conflict2}; +merge_simple([{A, _, _} = Tree | Next], [{B, _, _} | _] = Insert) when A < B -> + {Merged, _} = merge_simple(Next, Insert), + {[Tree | Merged], true}; +merge_simple(Ours, [Tree | Next]) -> + {Merged, _} = merge_simple(Ours, Next), + {[Tree | Merged], true}. find_missing(_Tree, []) -> []; @@ -159,7 +162,7 @@ remove_leafs(Trees, Keys) -> fun({PathPos, Path},TreeAcc) -> [SingleTree] = lists:foldl( fun({K,V},NewTreeAcc) -> [{K,V,NewTreeAcc}] end, [], Path), - {NewTrees, _} = merge(TreeAcc, [{PathPos + 1 - length(Path), SingleTree}]), + {NewTrees, _} = merge(TreeAcc, {PathPos + 1 - length(Path), SingleTree}), NewTrees end, [], FilteredPaths), {NewTree, RemovedKeys}. @@ -321,7 +324,7 @@ stem(Trees, Limit) -> fun({PathPos, Path},TreeAcc) -> [SingleTree] = lists:foldl( fun({K,V},NewTreeAcc) -> [{K,V,NewTreeAcc}] end, [], Path), - {NewTrees, _} = merge(TreeAcc, [{PathPos + 1 - length(Path), SingleTree}]), + {NewTrees, _} = merge(TreeAcc, {PathPos + 1 - length(Path), SingleTree}), NewTrees end, [], Paths2). diff --git a/apps/couch/src/couch_rep.erl b/apps/couch/src/couch_rep.erl index 126639e0..c804b49d 100644 --- a/apps/couch/src/couch_rep.erl +++ b/apps/couch/src/couch_rep.erl @@ -15,7 +15,7 @@ -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). --export([replicate/2, checkpoint/1, start_link/3]). +-export([replicate/2, checkpoint/1]). -include("couch_db.hrl"). @@ -34,6 +34,7 @@ start_seq, history, + session_id, source_log, target_log, rep_starttime, @@ -46,12 +47,11 @@ committed_seq = 0, stats = nil, - doc_ids = nil + doc_ids = nil, + source_db_update_notifier = nil, + target_db_update_notifier = nil }). -start_link(Id, PostBody, UserCtx) -> - gen_server:start_link(?MODULE, [Id, PostBody, UserCtx], []). - %% convenience function to do a simple replication from the shell replicate(Source, Target) when is_list(Source) -> replicate(?l2b(Source), Target); @@ -64,7 +64,7 @@ replicate(Source, Target) when is_binary(Source), is_binary(Target) -> replicate({Props}=PostBody, UserCtx) -> {BaseId, Extension} = make_replication_id(PostBody, UserCtx), Replicator = {BaseId ++ Extension, - {?MODULE, start_link, [BaseId, PostBody, UserCtx]}, + {gen_server, start_link, [?MODULE, [BaseId, PostBody, UserCtx], []]}, temporary, 1, worker, @@ -83,15 +83,10 @@ replicate({Props}=PostBody, UserCtx) -> false -> Server = start_replication_server(Replicator), - Continuous = couch_util:get_value(<<"continuous">>, Props, false), - Async = couch_util:get_value(<<"async">>, Props, false), - case {Continuous, Async} of - {true, _} -> + case couch_util:get_value(<<"continuous">>, Props, false) of + true -> {ok, {continuous, ?l2b(BaseId)}}; - {_, true} -> - spawn(fun() -> get_result(Server, PostBody, UserCtx) end), - Server; - _ -> + false -> get_result(Server, PostBody, UserCtx) end end. @@ -113,8 +108,10 @@ get_result(Server, PostBody, UserCtx) -> end. init(InitArgs) -> - try do_init(InitArgs) - catch _:Error -> + try + do_init(InitArgs) + catch + throw:Error -> {stop, Error} end. @@ -199,12 +196,15 @@ do_init([RepId, {PostProps}, UserCtx] = InitArgs) -> start_seq = StartSeq, history = History, + session_id = couch_uuids:random(), source_log = SourceLog, target_log = TargetLog, rep_starttime = httpd_util:rfc1123_date(), src_starttime = couch_util:get_value(instance_start_time, SourceInfo), tgt_starttime = couch_util:get_value(instance_start_time, TargetInfo), - doc_ids = DocIds + doc_ids = DocIds, + source_db_update_notifier = source_db_update_notifier(Source), + target_db_update_notifier = target_db_update_notifier(Target) }, {ok, State}. @@ -212,7 +212,21 @@ handle_call(get_result, From, #state{complete=true, listeners=[]} = State) -> {stop, normal, State#state{listeners=[From]}}; handle_call(get_result, From, State) -> Listeners = State#state.listeners, - {noreply, State#state{listeners=[From|Listeners]}}. + {noreply, State#state{listeners=[From|Listeners]}}; + +handle_call(get_source_db, _From, #state{source = Source} = State) -> + {reply, {ok, Source}, State}; + +handle_call(get_target_db, _From, #state{target = Target} = State) -> + {reply, {ok, Target}, State}. + +handle_cast(reopen_source_db, #state{source = Source} = State) -> + {ok, NewSource} = couch_db:reopen(Source), + {noreply, State#state{source = NewSource}}; + +handle_cast(reopen_target_db, #state{target = Target} = State) -> + {ok, NewTarget} = couch_db:reopen(Target), + {noreply, State#state{target = NewTarget}}; handle_cast(do_checkpoint, State) -> {noreply, do_checkpoint(State)}; @@ -221,16 +235,14 @@ handle_cast(_Msg, State) -> {noreply, State}. handle_info({missing_revs_checkpoint, SourceSeq}, State) -> - couch_task_status:update("MR Processed source update #~p of ~p", - [SourceSeq, seqnum(State#state.source)]), + couch_task_status:update("MR Processed source update #~p", [SourceSeq]), {noreply, schedule_checkpoint(State#state{committed_seq = SourceSeq})}; handle_info({writer_checkpoint, SourceSeq}, #state{committed_seq=N} = State) when SourceSeq > N -> MissingRevs = State#state.missing_revs, ok = gen_server:cast(MissingRevs, {update_committed_seq, SourceSeq}), - couch_task_status:update("W Processed source update #~p of ~p", - [SourceSeq, seqnum(State#state.source)]), + couch_task_status:update("W Processed source update #~p", [SourceSeq]), {noreply, schedule_checkpoint(State#state{committed_seq = SourceSeq})}; handle_info({writer_checkpoint, _}, State) -> {noreply, State}; @@ -239,14 +251,8 @@ handle_info({update_stats, Key, N}, State) -> ets:update_counter(State#state.stats, Key, N), {noreply, State}; -handle_info({'DOWN', _, _, Pid, _}, State) -> - Me = node(), - case erlang:node(Pid) of - Me -> - ?LOG_INFO("replication terminating - local DB is shutting down", []); - Node -> - ?LOG_INFO("replication terminating - DB on ~p is shutting down", [Node]) - end, +handle_info({'DOWN', _, _, _, _}, State) -> + ?LOG_INFO("replication terminating because local DB is shutting down", []), timer:cancel(State#state.checkpoint_scheduled), {stop, shutdown, State}; @@ -293,35 +299,40 @@ code_change(_OldVsn, State, _Extra) -> % internal funs start_replication_server(Replicator) -> - start_replication_server(Replicator, fun start_child/1). - -start_replication_server(Replicator, StartFun) -> - case StartFun(Replicator) of + RepId = element(1, Replicator), + case supervisor:start_child(couch_rep_sup, Replicator) of {ok, Pid} -> + ?LOG_INFO("starting new replication ~p at ~p", [RepId, Pid]), Pid; {error, already_present} -> - start_replication_server(Replicator, fun restart_child/1); + case supervisor:restart_child(couch_rep_sup, RepId) of + {ok, Pid} -> + ?LOG_INFO("starting replication ~p at ~p", [RepId, Pid]), + Pid; + {error, running} -> + %% this error occurs if multiple replicators are racing + %% each other to start and somebody else won. Just grab + %% the Pid by calling start_child again. + {error, {already_started, Pid}} = + supervisor:start_child(couch_rep_sup, Replicator), + ?LOG_DEBUG("replication ~p already running at ~p", [RepId, Pid]), + Pid; + {error, {db_not_found, DbUrl}} -> + throw({db_not_found, <<"could not open ", DbUrl/binary>>}); + {error, {unauthorized, DbUrl}} -> + throw({unauthorized, + <<"unauthorized to access database ", DbUrl/binary>>}) + end; {error, {already_started, Pid}} -> + ?LOG_DEBUG("replication ~p already running at ~p", [RepId, Pid]), Pid; - {error, running} -> - Children = supervisor:which_children(couch_rep_sup), - {value, {_, Pid, _, _}} = lists:keysearch(Replicator, 1, Children), - Pid; - % sadly both seem to be needed. I don't know why. {error, {{db_not_found, DbUrl}, _}} -> throw({db_not_found, <<"could not open ", DbUrl/binary>>}); - {error, {db_not_found, DbUrl}} -> - throw({db_not_found, <<"could not open ", DbUrl/binary>>}); - {error, {node_not_connected, Node}} -> - throw({node_not_connected, Node}) + {error, {{unauthorized, DbUrl}, _}} -> + throw({unauthorized, + <<"unauthorized to access database ", DbUrl/binary>>}) end. -start_child(Replicator) -> - supervisor:start_child(couch_rep_sup, Replicator). - -restart_child(Replicator) -> - supervisor:restart_child(couch_rep_sup, element(1, Replicator)). - compare_replication_logs(SrcDoc, TgtDoc) -> #doc{body={RepRecProps}} = SrcDoc, #doc{body={RepRecPropsTgt}} = TgtDoc, @@ -373,15 +384,9 @@ close_db(Db) -> couch_db:close(Db). dbname(#http_db{url = Url}) -> - strip_password(Url); -dbname(#db{name = Name, main_pid = MainPid}) -> - ?l2b([Name, " (", pid_to_list(MainPid), ")"]). - -strip_password(Url) -> - re:replace(Url, - "http(s)?://([^:]+):[^@]+@(.*)$", - "http\\1://\\2:*****@\\3", - [{return, list}]). + couch_util:url_strip_password(Url); +dbname(#db{name = Name}) -> + Name. dbinfo(#http_db{} = Db) -> {DbProps} = couch_rep_httpc:request(Db), @@ -445,13 +450,20 @@ do_terminate(State) -> false -> [gen_server:reply(R, retry) || R <- OtherListeners] end, + couch_task_status:update("Finishing"), terminate_cleanup(State). -terminate_cleanup(#state{source=Source, target=Target, stats=Stats}) -> - couch_task_status:update("Finishing"), - close_db(Target), - close_db(Source), - ets:delete(Stats). +terminate_cleanup(State) -> + close_db(State#state.source), + close_db(State#state.target), + stop_db_update_notifier(State#state.source_db_update_notifier), + stop_db_update_notifier(State#state.target_db_update_notifier), + ets:delete(State#state.stats). + +stop_db_update_notifier(nil) -> + ok; +stop_db_update_notifier(Notifier) -> + couch_db_update_notifier:stop(Notifier). has_session_id(_SessionId, []) -> false; @@ -476,12 +488,7 @@ maybe_append_options(Options, Props) -> make_replication_id({Props}, UserCtx) -> %% funky algorithm to preserve backwards compatibility - case couch_util:get_value(<<"use_hostname">>, Props, false) of - true -> - {ok, HostName} = inet:gethostname(); - false -> - HostName = couch_config:get("replication", "hostname", "cloudant.com") - end, + {ok, HostName} = inet:gethostname(), % Port = mochiweb_socket_server:get(couch_httpd, port), Src = get_rep_endpoint(UserCtx, couch_util:get_value(<<"source">>, Props)), Tgt = get_rep_endpoint(UserCtx, couch_util:get_value(<<"target">>, Props)), @@ -504,22 +511,15 @@ make_replication_id({Props}, UserCtx) -> maybe_add_trailing_slash(Url) -> re:replace(Url, "[^/]$", "&/", [{return, list}]). -get_rep_endpoint(UserCtx, {Props}) -> - case couch_util:get_value(<<"url">>, Props) of +get_rep_endpoint(_UserCtx, {Props}) -> + Url = maybe_add_trailing_slash(couch_util:get_value(<<"url">>, Props)), + {BinHeaders} = couch_util:get_value(<<"headers">>, Props, {[]}), + {Auth} = couch_util:get_value(<<"auth">>, Props, {[]}), + case couch_util:get_value(<<"oauth">>, Auth) of undefined -> - Node = couch_util:get_value(<<"node">>, Props), - Name = couch_util:get_value(<<"name">>, Props), - {Node, Name, UserCtx}; - RawUrl -> - Url = maybe_add_trailing_slash(RawUrl), - {BinHeaders} = couch_util:get_value(<<"headers">>, Props, {[]}), - {Auth} = couch_util:get_value(<<"auth">>, Props, {[]}), - case couch_util:get_value(<<"oauth">>, Auth) of - undefined -> - {remote, Url, [{?b2l(K),?b2l(V)} || {K,V} <- BinHeaders]}; - {OAuth} -> - {remote, Url, [{?b2l(K),?b2l(V)} || {K,V} <- BinHeaders], OAuth} - end + {remote, Url, [{?b2l(K),?b2l(V)} || {K,V} <- BinHeaders]}; + {OAuth} -> + {remote, Url, [{?b2l(K),?b2l(V)} || {K,V} <- BinHeaders], OAuth} end; get_rep_endpoint(_UserCtx, <<"http://",_/binary>>=Url) -> {remote, maybe_add_trailing_slash(Url), []}; @@ -533,43 +533,27 @@ open_replication_log(#http_db{}=Db, RepId) -> Req = Db#http_db{resource=couch_util:url_encode(DocId)}, case couch_rep_httpc:request(Req) of {[{<<"error">>, _}, {<<"reason">>, _}]} -> - % ?LOG_DEBUG("didn't find a replication log for ~s", [Db#http_db.url]), + ?LOG_DEBUG("didn't find a replication log for ~s", [Db#http_db.url]), #doc{id=?l2b(DocId)}; Doc -> - % ?LOG_DEBUG("found a replication log for ~s", [Db#http_db.url]), + ?LOG_DEBUG("found a replication log for ~s", [Db#http_db.url]), couch_doc:from_json_obj(Doc) end; open_replication_log(Db, RepId) -> DocId = ?l2b(?LOCAL_DOC_PREFIX ++ RepId), case couch_db:open_doc(Db, DocId, []) of {ok, Doc} -> - % ?LOG_DEBUG("found a replication log for ~s", [Db#db.name]), + ?LOG_DEBUG("found a replication log for ~s", [Db#db.name]), Doc; _ -> - % ?LOG_DEBUG("didn't find a replication log for ~s", [Db#db.name]), + ?LOG_DEBUG("didn't find a replication log for ~s", [Db#db.name]), #doc{id=DocId} end. open_db(Props, UserCtx, ProxyParams) -> open_db(Props, UserCtx, ProxyParams, false). -open_db(<<"http://",_/binary>>=Url, _, ProxyParams, Create) -> - open_remote_db({[{<<"url">>,Url}]}, ProxyParams, Create); -open_db(<<"https://",_/binary>>=Url, _, ProxyParams, Create) -> - open_remote_db({[{<<"url">>,Url}]}, ProxyParams, Create); -open_db({Props}, UserCtx, ProxyParams, Create) -> - case couch_util:get_value(<<"url">>, Props) of - undefined -> - Node = couch_util:get_value(<<"node">>, Props, node()), - DbName = couch_util:get_value(<<"name">>, Props), - open_local_db(Node, DbName, UserCtx, Create); - _Url -> - open_remote_db({Props}, ProxyParams, Create) - end; -open_db(<<DbName/binary>>, UserCtx, _ProxyParams, Create) -> - open_local_db(node(), DbName, UserCtx, Create). - -open_remote_db({Props}, ProxyParams, CreateTarget) -> +open_db({Props}, _UserCtx, ProxyParams, CreateTarget) -> Url = maybe_add_trailing_slash(couch_util:get_value(<<"url">>, Props)), {AuthProps} = couch_util:get_value(<<"auth">>, Props, {[]}), {BinHeaders} = couch_util:get_value(<<"headers">>, Props, {[]}), @@ -580,33 +564,34 @@ open_remote_db({Props}, ProxyParams, CreateTarget) -> auth = AuthProps, headers = lists:ukeymerge(1, Headers, DefaultHeaders) }, - Db = Db1#http_db{options = Db1#http_db.options ++ ProxyParams}, - couch_rep_httpc:db_exists(Db, CreateTarget). - -open_local_db(Node, DbName, UserCtx, Create) when is_binary(Node) -> - try open_local_db(list_to_existing_atom(?b2l(Node)), DbName, UserCtx, Create) - catch error:badarg -> - ?LOG_ERROR("unknown replication node ~s", [Node]), - throw({node_not_connected, Node}) end; -open_local_db(Node, DbName, UserCtx, Create) when is_atom(Node) -> - case catch gen_server:call({couch_server, Node}, {open, DbName, []}, infinity) of - {ok, #db{} = Db} -> - couch_db:monitor(Db), - Db#db{fd_monitor = erlang:monitor(process, Db#db.fd)}; - {ok, MainPid} when is_pid(MainPid) -> - {ok, Db} = couch_db:open_ref_counted(MainPid, UserCtx), - couch_db:monitor(Db), - Db; - {not_found, no_db_file} when Create =:= false-> - throw({db_not_found, DbName}); - {not_found, no_db_file} -> - ok = couch_httpd:verify_is_server_admin(UserCtx), - couch_server:create(DbName, [{user_ctx, UserCtx}]); - {'EXIT', {{nodedown, Node}, _Stack}} -> - throw({node_not_connected, couch_util:to_binary(Node)}); - {'EXIT', {noproc, {gen_server,call,_}}} -> - timer:sleep(1000), - throw({noproc, couch_server, Node}) + Db = Db1#http_db{ + options = Db1#http_db.options ++ ProxyParams ++ + couch_rep_httpc:ssl_options(Db1) + }, + couch_rep_httpc:db_exists(Db, CreateTarget); +open_db(<<"http://",_/binary>>=Url, _, ProxyParams, CreateTarget) -> + open_db({[{<<"url">>,Url}]}, [], ProxyParams, CreateTarget); +open_db(<<"https://",_/binary>>=Url, _, ProxyParams, CreateTarget) -> + open_db({[{<<"url">>,Url}]}, [], ProxyParams, CreateTarget); +open_db(<<DbName/binary>>, UserCtx, _ProxyParams, CreateTarget) -> + try + case CreateTarget of + true -> + ok = couch_httpd:verify_is_server_admin(UserCtx), + couch_server:create(DbName, [{user_ctx, UserCtx}]); + false -> + ok + end, + + case couch_db:open(DbName, [{user_ctx, UserCtx}]) of + {ok, Db} -> + couch_db:monitor(Db), + Db; + {not_found, no_db_file} -> + throw({db_not_found, DbName}) + end + catch throw:{unauthorized, _} -> + throw({unauthorized, DbName}) end. schedule_checkpoint(#state{checkpoint_scheduled = nil} = State) -> @@ -628,6 +613,7 @@ do_checkpoint(State) -> committed_seq = NewSeqNum, start_seq = StartSeqNum, history = OldHistory, + session_id = SessionId, source_log = SourceLog, target_log = TargetLog, rep_starttime = ReplicationStartTime, @@ -637,14 +623,8 @@ do_checkpoint(State) -> } = State, case commit_to_both(Source, Target, NewSeqNum) of {SrcInstanceStartTime, TgtInstanceStartTime} -> - ?LOG_DEBUG("recording a checkpoint for ~s -> ~s at source update_seq ~p" - " of ~p", [dbname(Source), dbname(Target), NewSeqNum, seqnum(Source)]), - SessionId = couch_uuids:new(), - TargetNode = case Target of #db{main_pid=MainPid} -> - erlang:node(MainPid); - _ -> - http - end, + ?LOG_INFO("recording a checkpoint for ~s -> ~s at source update_seq ~p", + [dbname(Source), dbname(Target), NewSeqNum]), NewHistoryEntry = {[ {<<"session_id">>, SessionId}, {<<"start_time">>, list_to_binary(ReplicationStartTime)}, @@ -663,7 +643,6 @@ do_checkpoint(State) -> NewRepHistory = {[ {<<"session_id">>, SessionId}, {<<"source_last_seq">>, NewSeqNum}, - {<<"target_node">>, TargetNode}, {<<"history">>, lists:sublist([NewHistoryEntry | OldHistory], 50)} ]}, @@ -683,9 +662,7 @@ do_checkpoint(State) -> "yourself?)", []), State end; - Else -> - ?LOG_INFO("wanted ~p, got ~p from commit_to_both", [ - {SrcInstanceStartTime, TgtInstanceStartTime}, Else]), + _Else -> ?LOG_INFO("rebooting ~s -> ~s from last known replication checkpoint", [dbname(Source), dbname(Target)]), #state{ @@ -717,31 +694,30 @@ commit_to_both(Source, Target, RequiredSeq) -> {SrcCommitPid, Timestamp} -> Timestamp; {'EXIT', SrcCommitPid, {http_request_failed, _}} -> - nil; - {'EXIT', SrcCommitPid, {noproc, {gen_server, call, [_]}}} -> - nil; % DB crashed, this should trigger a reboot - {'EXIT', SrcCommitPid, Else} -> - ?LOG_ERROR("new error code for crashed replication commit ~p", [Else]), - nil + exit(replication_link_failure) end, {SourceStartTime, TargetStartTime}. ensure_full_commit(#http_db{headers = Headers} = Target) -> + Headers1 = [ + {"Content-Length", 0} | + couch_util:proplist_apply_field( + {"Content-Type", "application/json"}, Headers) + ], Req = Target#http_db{ resource = "_ensure_full_commit", method = post, - headers = couch_util:proplist_apply_field({"Content-Type", "application/json"}, Headers) + headers = Headers1 }, {ResultProps} = couch_rep_httpc:request(Req), true = couch_util:get_value(<<"ok">>, ResultProps), couch_util:get_value(<<"instance_start_time">>, ResultProps); -ensure_full_commit(#db{name=DbName, main_pid=Pid} = Target) -> - TargetNode = erlang:node(Pid), - {ok, NewDb} = rpc:call(TargetNode, couch_db, open_int, [DbName, []]), +ensure_full_commit(Target) -> + {ok, NewDb} = couch_db:open_int(Target#db.name, []), UpdateSeq = couch_db:get_update_seq(Target), CommitSeq = couch_db:get_committed_update_seq(NewDb), InstanceStartTime = NewDb#db.instance_start_time, - catch couch_db:close(NewDb), + couch_db:close(NewDb), if UpdateSeq > CommitSeq -> ?LOG_DEBUG("target needs a full commit: update ~p commit ~p", [UpdateSeq, CommitSeq]), @@ -753,11 +729,16 @@ ensure_full_commit(#db{name=DbName, main_pid=Pid} = Target) -> end. ensure_full_commit(#http_db{headers = Headers} = Source, RequiredSeq) -> + Headers1 = [ + {"Content-Length", 0} | + couch_util:proplist_apply_field( + {"Content-Type", "application/json"}, Headers) + ], Req = Source#http_db{ resource = "_ensure_full_commit", method = post, qs = [{seq, RequiredSeq}], - headers = couch_util:proplist_apply_field({"Content-Type", "application/json"}, Headers) + headers = Headers1 }, {ResultProps} = couch_rep_httpc:request(Req), case couch_util:get_value(<<"ok">>, ResultProps) of @@ -801,11 +782,6 @@ up_to_date(Source, Seq) -> couch_db:close(NewDb), T. -seqnum(#http_db{}) -> - -1; -seqnum(Db) -> - Db#db.update_seq. - parse_proxy_params(ProxyUrl) when is_binary(ProxyUrl) -> parse_proxy_params(?b2l(ProxyUrl)); parse_proxy_params([]) -> @@ -820,3 +796,27 @@ parse_proxy_params(ProxyUrl) -> true -> [{proxy_user, User}, {proxy_password, Passwd}] end. + +source_db_update_notifier(#db{name = DbName}) -> + Server = self(), + {ok, Notifier} = couch_db_update_notifier:start_link( + fun({compacted, DbName1}) when DbName1 =:= DbName -> + ok = gen_server:cast(Server, reopen_source_db); + (_) -> + ok + end), + Notifier; +source_db_update_notifier(_) -> + nil. + +target_db_update_notifier(#db{name = DbName}) -> + Server = self(), + {ok, Notifier} = couch_db_update_notifier:start_link( + fun({compacted, DbName1}) when DbName1 =:= DbName -> + ok = gen_server:cast(Server, reopen_target_db); + (_) -> + ok + end), + Notifier; +target_db_update_notifier(_) -> + nil. diff --git a/apps/couch/src/couch_rep_att.erl b/apps/couch/src/couch_rep_att.erl index 476c64d4..72c723e8 100644 --- a/apps/couch/src/couch_rep_att.erl +++ b/apps/couch/src/couch_rep_att.erl @@ -105,7 +105,7 @@ validate_headers(_Req, 200, Headers) -> MochiHeaders = mochiweb_headers:make(Headers), {ok, mochiweb_headers:get_value("Content-Encoding", MochiHeaders)}; validate_headers(Req, Code, Headers) when Code > 299, Code < 400 -> - Url = mochiweb_headers:get_value("Location",mochiweb_headers:make(Headers)), + Url = couch_rep_httpc:redirect_url(Headers, Req#http_db.url), NewReq = couch_rep_httpc:redirected_request(Req, Url), {ibrowse_req_id, ReqId} = couch_rep_httpc:request(NewReq), receive {ibrowse_async_headers, ReqId, NewCode, NewHeaders} -> diff --git a/apps/couch/src/couch_rep_changes_feed.erl b/apps/couch/src/couch_rep_changes_feed.erl index 66696912..032f62a3 100644 --- a/apps/couch/src/couch_rep_changes_feed.erl +++ b/apps/couch/src/couch_rep_changes_feed.erl @@ -43,9 +43,10 @@ next(Server) -> gen_server:call(Server, next_changes, infinity). stop(Server) -> - gen_server:call(Server, stop). + catch gen_server:call(Server, stop), + ok. -init([_Parent, #http_db{}=Source, Since, PostProps] = Args) -> +init([Parent, #http_db{}=Source, Since, PostProps]) -> process_flag(trap_exit, true), Feed = case couch_util:get_value(<<"continuous">>, PostProps, false) of false -> @@ -83,27 +84,32 @@ init([_Parent, #http_db{}=Source, Since, PostProps] = Args) -> resource = "_changes", qs = QS, conn = Pid, - options = [{stream_to, {self(), once}}, {response_format, binary}], + options = [{stream_to, {self(), once}}] ++ + lists:keydelete(inactivity_timeout, 1, Source#http_db.options), headers = Source#http_db.headers -- [{"Accept-Encoding", "gzip"}] }, {ibrowse_req_id, ReqId} = couch_rep_httpc:request(Req), + Args = [Parent, Req, Since, PostProps], receive {ibrowse_async_headers, ReqId, "200", _} -> ibrowse:stream_next(ReqId), {ok, #state{conn=Pid, last_seq=Since, reqid=ReqId, init_args=Args}}; {ibrowse_async_headers, ReqId, Code, Hdrs} when Code=="301"; Code=="302" -> - catch ibrowse:stop_worker_process(Pid), - Url2 = mochiweb_headers:get_value("Location", mochiweb_headers:make(Hdrs)), - %% TODO use couch_httpc:request instead of start_http_request - {Pid2, ReqId2} = start_http_request(Url2), + stop_link_worker(Pid), + Url2 = couch_rep_httpc:redirect_url(Hdrs, Req#http_db.url), + Req2 = couch_rep_httpc:redirected_request(Req, Url2), + Pid2 = couch_rep_httpc:spawn_link_worker_process(Req2), + Req3 = Req2#http_db{conn = Pid2}, + {ibrowse_req_id, ReqId2} = couch_rep_httpc:request(Req3), + Args2 = [Parent, Req3, Since, PostProps], receive {ibrowse_async_headers, ReqId2, "200", _} -> - {ok, #state{conn=Pid2, last_seq=Since, reqid=ReqId2, init_args=Args}} + {ok, #state{conn=Pid2, last_seq=Since, reqid=ReqId2, init_args=Args2}} after 30000 -> {stop, changes_timeout} end; {ibrowse_async_headers, ReqId, "404", _} -> - catch ibrowse:stop_worker_process(Pid), + stop_link_worker(Pid), ?LOG_INFO("source doesn't have _changes, trying _all_docs_by_seq", []), Self = self(), BySeqPid = spawn_link(fun() -> by_seq_loop(Self, Source, Since) end), @@ -181,7 +187,7 @@ handle_cast(_Msg, State) -> handle_info({ibrowse_async_headers, Id, Code, Hdrs}, #state{reqid=Id}=State) -> handle_headers(list_to_integer(Code), Hdrs, State); -handle_info({ibrowse_async_response, Id, {error,connection_closed}}, +handle_info({ibrowse_async_response, Id, {error, sel_conn_closed}}, #state{reqid=Id}=State) -> handle_retry(State); @@ -198,16 +204,27 @@ handle_info({ibrowse_async_response_end, Id}, #state{reqid=Id} = State) -> handle_info({'EXIT', From, normal}, #state{changes_loop=From} = State) -> handle_feed_completion(State); +handle_info({'EXIT', From, normal}, #state{conn=From, complete=true} = State) -> + {noreply, State}; + handle_info({'EXIT', From, Reason}, #state{changes_loop=From} = State) -> ?LOG_ERROR("changes_loop died with reason ~p", [Reason]), {stop, changes_loop_died, State}; -handle_info({'EXIT', _From, normal}, State) -> - {noreply, State}; +handle_info({'EXIT', From, Reason}, State) -> + ?LOG_ERROR("changes loop, process ~p died with reason ~p", [From, Reason]), + {stop, {From, Reason}, State}; -handle_info(Msg, State) -> - ?LOG_DEBUG("unexpected message at changes_feed ~p", [Msg]), - {noreply, State}. +handle_info(Msg, #state{init_args = InitArgs} = State) -> + case Msg of + changes_timeout -> + [_, #http_db{url = Url} | _] = InitArgs, + ?LOG_ERROR("changes loop timeout, no data received from ~s", + [couch_util:url_strip_password(Url)]); + _ -> + ?LOG_ERROR("changes loop received unexpected message ~p", [Msg]) + end, + {stop, Msg, State}. terminate(_Reason, State) -> #state{ @@ -215,8 +232,7 @@ terminate(_Reason, State) -> conn = Conn } = State, if is_pid(ChangesPid) -> exit(ChangesPid, stop); true -> ok end, - if is_pid(Conn) -> catch ibrowse:stop_worker_process(Conn); true -> ok end, - ok. + stop_link_worker(Conn). code_change(_OldVsn, State, _Extra) -> {ok, State}. @@ -257,12 +273,17 @@ handle_next_changes(_From, State) -> handle_headers(200, _, State) -> maybe_stream_next(State), {noreply, State}; -handle_headers(301, Hdrs, State) -> - catch ibrowse:stop_worker_process(State#state.conn), - Url = mochiweb_headers:get_value("Location", mochiweb_headers:make(Hdrs)), - %% TODO use couch_httpc:request instead of start_http_request - {Pid, ReqId} = start_http_request(Url), - {noreply, State#state{conn=Pid, reqid=ReqId}}; +handle_headers(Code, Hdrs, #state{init_args = InitArgs} = State) + when Code =:= 301 ; Code =:= 302 -> + stop_link_worker(State#state.conn), + [Parent, #http_db{url = Url1} = Source, Since, PostProps] = InitArgs, + Url = couch_rep_httpc:redirect_url(Hdrs, Url1), + Source2 = couch_rep_httpc:redirected_request(Source, Url), + Pid2 = couch_rep_httpc:spawn_link_worker_process(Source2), + Source3 = Source2#http_db{conn = Pid2}, + {ibrowse_req_id, ReqId} = couch_rep_httpc:request(Source3), + InitArgs2 = [Parent, Source3, Since, PostProps], + {noreply, State#state{conn=Pid2, reqid=ReqId, init_args=InitArgs2}}; handle_headers(Code, Hdrs, State) -> ?LOG_ERROR("replicator changes feed failed with code ~s and Headers ~n~p", [Code,Hdrs]), @@ -367,20 +388,15 @@ maybe_stream_next(#state{reqid=nil}) -> ok; maybe_stream_next(#state{complete=false, count=N} = S) when N < ?BUFFER_SIZE -> timer:cancel(get(timeout)), - {ok, Timeout} = timer:exit_after(31000, changes_timeout), + {ok, Timeout} = timer:send_after(31000, changes_timeout), put(timeout, Timeout), ibrowse:stream_next(S#state.reqid); maybe_stream_next(_) -> timer:cancel(get(timeout)). -start_http_request(RawUrl) -> - Url = ibrowse_lib:parse_url(RawUrl), - {ok, Pid} = ibrowse:spawn_link_worker_process(Url#url.host, Url#url.port), - Opts = [ - {stream_to, {self(), once}}, - {inactivity_timeout, 31000}, - {response_format, binary} - ], - {ibrowse_req_id, Id} = - ibrowse:send_req_direct(Pid, RawUrl, [], get, [], Opts, infinity), - {Pid, Id}. +stop_link_worker(Conn) when is_pid(Conn) -> + unlink(Conn), + receive {'EXIT', Conn, _} -> ok after 0 -> ok end, + catch ibrowse:stop_worker_process(Conn); +stop_link_worker(_) -> + ok. diff --git a/apps/couch/src/couch_rep_httpc.erl b/apps/couch/src/couch_rep_httpc.erl index 3b11b869..8153fdcf 100644 --- a/apps/couch/src/couch_rep_httpc.erl +++ b/apps/couch/src/couch_rep_httpc.erl @@ -15,7 +15,8 @@ -include_lib("ibrowse/include/ibrowse.hrl"). -export([db_exists/1, db_exists/2, full_url/1, request/1, redirected_request/2, - spawn_worker_process/1, spawn_link_worker_process/1]). + redirect_url/2, spawn_worker_process/1, spawn_link_worker_process/1]). +-export([ssl_options/1]). request(#http_db{} = Req) -> do_request(Req). @@ -72,6 +73,7 @@ db_exists(Req, CanonicalUrl, CreateDB) -> #http_db{ auth = Auth, headers = Headers0, + options = Options, url = Url } = Req, HeadersFun = fun(Method) -> @@ -84,11 +86,13 @@ db_exists(Req, CanonicalUrl, CreateDB) -> end, case CreateDB of true -> - catch ibrowse:send_req(Url, HeadersFun(put), put); + Headers = [{"Content-Length", 0} | HeadersFun(put)], + catch ibrowse:send_req(Url, Headers, put, [], Options); _Else -> ok end, - case catch ibrowse:send_req(Url, HeadersFun(head), head) of + case catch ibrowse:send_req(Url, HeadersFun(head), head, [], Options) of {ok, "200", _, _} -> + config_http(CanonicalUrl), Req#http_db{url = CanonicalUrl}; {ok, "301", RespHeaders, _} -> RedirectUrl = redirect_url(RespHeaders, Req#http_db.url), @@ -96,11 +100,26 @@ db_exists(Req, CanonicalUrl, CreateDB) -> {ok, "302", RespHeaders, _} -> RedirectUrl = redirect_url(RespHeaders, Req#http_db.url), db_exists(Req#http_db{url = RedirectUrl}, CanonicalUrl); + {ok, "401", _, _} -> + throw({unauthorized, ?l2b(Url)}); Error -> ?LOG_DEBUG("DB at ~s could not be found because ~p", [Url, Error]), throw({db_not_found, ?l2b(Url)}) end. +config_http(Url) -> + #url{host = Host, port = Port} = ibrowse_lib:parse_url(Url), + ok = ibrowse:set_max_sessions(Host, Port, list_to_integer( + couch_config:get("replicator", "max_http_sessions", "20"))), + ok = ibrowse:set_max_pipeline_size(Host, Port, list_to_integer( + couch_config:get("replicator", "max_http_pipeline_size", "50"))), + ok = couch_config:register( + fun("replicator", "max_http_sessions", MaxSessions) -> + ibrowse:set_max_sessions(Host, Port, list_to_integer(MaxSessions)); + ("replicator", "max_http_pipeline_size", PipeSize) -> + ibrowse:set_max_pipeline_size(Host, Port, list_to_integer(PipeSize)) + end). + redirect_url(RespHeaders, OrigUrl) -> MochiHeaders = mochiweb_headers:make(RespHeaders), RedUrl = mochiweb_headers:get_value("Location", MochiHeaders), @@ -167,7 +186,7 @@ process_response({error, Reason}, Req) -> pause = Pause } = Req, ShortReason = case Reason of - connection_closed -> + sel_conn_closed -> connection_closed; {'EXIT', {noproc, _}} -> noproc; @@ -203,8 +222,7 @@ spawn_worker_process(Req) -> Pid. spawn_link_worker_process(Req) -> - Url = ibrowse_lib:parse_url(Req#http_db.url), - {ok, Pid} = ibrowse_http_client:start_link(Url), + {ok, Pid} = ibrowse:spawn_link_worker_process(Req#http_db.url), Pid. maybe_decompress(Headers, Body) -> @@ -243,3 +261,35 @@ oauth_header(Url, QS, Action, Props) -> Params = oauth:signed_params(Method, Url, QSL, Consumer, Token, TokenSecret) -- QSL, {"Authorization", "OAuth " ++ oauth_uri:params_to_header_string(Params)}. + +ssl_options(#http_db{url = Url}) -> + case ibrowse_lib:parse_url(Url) of + #url{protocol = https} -> + Depth = list_to_integer( + couch_config:get("replicator", "ssl_certificate_max_depth", "3") + ), + SslOpts = [{depth, Depth} | + case couch_config:get("replicator", "verify_ssl_certificates") of + "true" -> + ssl_verify_options(true); + _ -> + ssl_verify_options(false) + end], + [{is_ssl, true}, {ssl_options, SslOpts}]; + #url{protocol = http} -> + [] + end. + +ssl_verify_options(Value) -> + ssl_verify_options(Value, erlang:system_info(otp_release)). + +ssl_verify_options(true, OTPVersion) when OTPVersion >= "R14" -> + CAFile = couch_config:get("replicator", "ssl_trusted_certificates_file"), + [{verify, verify_peer}, {cacertfile, CAFile}]; +ssl_verify_options(false, OTPVersion) when OTPVersion >= "R14" -> + [{verify, verify_none}]; +ssl_verify_options(true, _OTPVersion) -> + CAFile = couch_config:get("replicator", "ssl_trusted_certificates_file"), + [{verify, 2}, {cacertfile, CAFile}]; +ssl_verify_options(false, _OTPVersion) -> + [{verify, 0}]. diff --git a/apps/couch/src/couch_rep_missing_revs.erl b/apps/couch/src/couch_rep_missing_revs.erl index 1eff6774..9809ca5e 100644 --- a/apps/couch/src/couch_rep_missing_revs.erl +++ b/apps/couch/src/couch_rep_missing_revs.erl @@ -24,7 +24,6 @@ -record (state, { changes_loop, changes_from = nil, - target, parent, complete = false, count = 0, @@ -44,11 +43,11 @@ next(Server) -> stop(Server) -> gen_server:call(Server, stop). -init([Parent, Target, ChangesFeed, _PostProps]) -> +init([Parent, _Target, ChangesFeed, _PostProps]) -> process_flag(trap_exit, true), Self = self(), - Pid = spawn_link(fun() -> changes_loop(Self, ChangesFeed, Target) end), - {ok, #state{changes_loop=Pid, target=Target, parent=Parent}}. + Pid = spawn_link(fun() -> changes_loop(Self, ChangesFeed, Parent) end), + {ok, #state{changes_loop=Pid, parent=Parent}}. handle_call({add_missing_revs, {HighSeq, Revs}}, From, State) -> State#state.parent ! {update_stats, missing_revs, length(Revs)}, @@ -133,15 +132,16 @@ handle_changes_loop_exit(normal, State) -> handle_changes_loop_exit(Reason, State) -> {stop, Reason, State#state{changes_loop=nil}}. -changes_loop(OurServer, SourceChangesServer, Target) -> +changes_loop(OurServer, SourceChangesServer, Parent) -> case couch_rep_changes_feed:next(SourceChangesServer) of complete -> exit(normal); Changes -> + {ok, Target} = gen_server:call(Parent, get_target_db, infinity), MissingRevs = get_missing_revs(Target, Changes), gen_server:call(OurServer, {add_missing_revs, MissingRevs}, infinity) end, - changes_loop(OurServer, SourceChangesServer, Target). + changes_loop(OurServer, SourceChangesServer, Parent). get_missing_revs(#http_db{}=Target, Changes) -> Transform = fun({Props}) -> diff --git a/apps/couch/src/couch_rep_reader.erl b/apps/couch/src/couch_rep_reader.erl index 46633994..a7ae45a8 100644 --- a/apps/couch/src/couch_rep_reader.erl +++ b/apps/couch/src/couch_rep_reader.erl @@ -20,12 +20,9 @@ -import(couch_util, [url_encode/1]). -define (BUFFER_SIZE, 1000). --define (MAX_CONCURRENT_REQUESTS, 10). --define (MAX_CONNECTIONS, 20). --define (MAX_PIPELINE_SIZE, 50). +-define (MAX_CONCURRENT_REQUESTS, 100). -include("couch_db.hrl"). --include_lib("ibrowse/include/ibrowse.hrl"). -record (state, { parent, @@ -53,14 +50,9 @@ next(Pid) -> init([Parent, Source, MissingRevs_or_DocIds, _PostProps]) -> process_flag(trap_exit, true), - if is_record(Source, http_db) -> - #url{host=Host, port=Port} = ibrowse_lib:parse_url(Source#http_db.url), - ibrowse:set_max_sessions(Host, Port, ?MAX_CONNECTIONS), - ibrowse:set_max_pipeline_size(Host, Port, ?MAX_PIPELINE_SIZE); - true -> ok end, Self = self(), ReaderLoop = spawn_link( - fun() -> reader_loop(Self, Source, MissingRevs_or_DocIds) end + fun() -> reader_loop(Self, Parent, Source, MissingRevs_or_DocIds) end ), MissingRevs = case MissingRevs_or_DocIds of Pid when is_pid(Pid) -> @@ -230,7 +222,7 @@ update_sequence_lists(Seq, State) -> opened_seqs = Opened }. -open_doc_revs(#http_db{} = DbS, DocId, Revs) -> +open_doc_revs(#http_db{url = Url} = DbS, DocId, Revs) -> %% all this logic just splits up revision lists that are too long for %% MochiWeb into multiple requests BaseQS = [{revs,true}, {latest,true}, {att_encoding_info,true}], @@ -246,36 +238,48 @@ open_doc_revs(#http_db{} = DbS, DocId, Revs) -> JsonResults = lists:flatten([couch_rep_httpc:request(R) || R <- Requests]), Transform = - fun({[{<<"missing">>, Rev}]}) -> - {{not_found, missing}, couch_doc:parse_rev(Rev)}; - ({[{<<"ok">>, Json}]}) -> + fun({[{<<"ok">>, Json}]}, Acc) -> #doc{id=Id, revs=Rev, atts=Atts} = Doc = couch_doc:from_json_obj(Json), - Doc#doc{atts=[couch_rep_att:convert_stub(A, {DbS,Id,Rev}) || A <- Atts]} + Doc1 = Doc#doc{ + atts=[couch_rep_att:convert_stub(A, {DbS,Id,Rev}) || A <- Atts] + }, + [Doc1 | Acc]; + ({ErrorProps}, Acc) -> + Err = couch_util:get_value(<<"error">>, ErrorProps, + ?JSON_ENCODE({ErrorProps})), + ?LOG_ERROR("Replicator: error accessing doc ~s at ~s, reason: ~s", + [DocId, couch_util:url_strip_password(Url), Err]), + Acc end, - [Transform(Result) || Result <- JsonResults]. + lists:reverse(lists:foldl(Transform, [], JsonResults)). -open_doc(#http_db{} = DbS, DocId) -> +open_doc(#http_db{url = Url} = DbS, DocId) -> % get latest rev of the doc Req = DbS#http_db{ resource=url_encode(DocId), qs=[{att_encoding_info, true}] }, - case couch_rep_httpc:request(Req) of - {[{<<"error">>,<<"not_found">>}, {<<"reason">>,<<"missing">>}]} -> - []; - Json -> + {Props} = Json = couch_rep_httpc:request(Req), + case couch_util:get_value(<<"_id">>, Props) of + Id when is_binary(Id) -> #doc{id=Id, revs=Rev, atts=Atts} = Doc = couch_doc:from_json_obj(Json), [Doc#doc{ atts=[couch_rep_att:convert_stub(A, {DbS,Id,Rev}) || A <- Atts] - }] + }]; + undefined -> + Err = couch_util:get_value(<<"error">>, Props, ?JSON_ENCODE(Json)), + ?LOG_ERROR("Replicator: error accessing doc ~s at ~s, reason: ~s", + [DocId, couch_util:url_strip_password(Url), Err]), + [] end. -reader_loop(ReaderServer, Source, DocIds) when is_list(DocIds) -> - case Source of +reader_loop(ReaderServer, Parent, Source1, DocIds) when is_list(DocIds) -> + case Source1 of #http_db{} -> [gen_server:call(ReaderServer, {open_remote_doc, Id, nil, nil}, infinity) || Id <- DocIds]; _LocalDb -> + {ok, Source} = gen_server:call(Parent, get_source_db, infinity), Docs = lists:foldr(fun(Id, Acc) -> case couch_db:open_doc(Source, Id) of {ok, Doc} -> @@ -288,7 +292,7 @@ reader_loop(ReaderServer, Source, DocIds) when is_list(DocIds) -> end, exit(complete); -reader_loop(ReaderServer, Source, MissingRevsServer) -> +reader_loop(ReaderServer, Parent, Source, MissingRevsServer) -> case couch_rep_missing_revs:next(MissingRevsServer) of complete -> exit(complete); @@ -301,22 +305,23 @@ reader_loop(ReaderServer, Source, MissingRevsServer) -> #http_db{} -> [gen_server:call(ReaderServer, {open_remote_doc, Id, Seq, Revs}, infinity) || {Id,Seq,Revs} <- SortedIdsRevs], - reader_loop(ReaderServer, Source, MissingRevsServer); + reader_loop(ReaderServer, Parent, Source, MissingRevsServer); _Local -> - Source2 = maybe_reopen_db(Source, HighSeq), + {ok, Source1} = gen_server:call(Parent, get_source_db, infinity), + Source2 = maybe_reopen_db(Source1, HighSeq), lists:foreach(fun({Id,Seq,Revs}) -> {ok, Docs} = couch_db:open_doc_revs(Source2, Id, Revs, [latest]), JustTheDocs = [Doc || {ok, Doc} <- Docs], gen_server:call(ReaderServer, {add_docs, Seq, JustTheDocs}, infinity) end, SortedIdsRevs), - reader_loop(ReaderServer, Source2, MissingRevsServer) + couch_db:close(Source2), + reader_loop(ReaderServer, Parent, Source2, MissingRevsServer) end end. maybe_reopen_db(#db{update_seq=OldSeq} = Db, HighSeq) when HighSeq > OldSeq -> {ok, NewDb} = couch_db:open(Db#db.name, [{user_ctx, Db#db.user_ctx}]), - couch_db:close(Db), NewDb; maybe_reopen_db(Db, _HighSeq) -> Db. diff --git a/apps/couch/src/couch_rep_writer.erl b/apps/couch/src/couch_rep_writer.erl index dd6396fd..cf98ccfb 100644 --- a/apps/couch/src/couch_rep_writer.erl +++ b/apps/couch/src/couch_rep_writer.erl @@ -16,10 +16,10 @@ -include("couch_db.hrl"). -start_link(Parent, Target, Reader, _PostProps) -> - {ok, spawn_link(fun() -> writer_loop(Parent, Reader, Target) end)}. +start_link(Parent, _Target, Reader, _PostProps) -> + {ok, spawn_link(fun() -> writer_loop(Parent, Reader) end)}. -writer_loop(Parent, Reader, Target) -> +writer_loop(Parent, Reader) -> case couch_rep_reader:next(Reader) of {complete, nil} -> ok; @@ -28,6 +28,7 @@ writer_loop(Parent, Reader, Target) -> ok; {HighSeq, Docs} -> DocCount = length(Docs), + {ok, Target} = gen_server:call(Parent, get_target_db, infinity), try write_docs(Target, Docs) of {ok, []} -> Parent ! {update_stats, docs_written, DocCount}; @@ -48,7 +49,7 @@ writer_loop(Parent, Reader, Target) -> end, couch_rep_att:cleanup(), couch_util:should_flush(), - writer_loop(Parent, Reader, Target) + writer_loop(Parent, Reader) end. write_docs(#http_db{} = Db, Docs) -> diff --git a/apps/couch/src/couch_util.erl b/apps/couch/src/couch_util.erl index 3a6e92c5..adcb4450 100644 --- a/apps/couch/src/couch_util.erl +++ b/apps/couch/src/couch_util.erl @@ -27,6 +27,7 @@ -export([get_value/2, get_value/3]). -export([md5/1, md5_init/0, md5_update/2, md5_final/1]). -export([reorder_results/2]). +-export([url_strip_password/1]). -include("couch_db.hrl"). -include_lib("kernel/include/file.hrl"). @@ -417,8 +418,8 @@ compressible_att_type(MimeType) -> ), lists:any( fun(TypeExp) -> - Regexp = "^\\s*" ++ - re:replace(TypeExp, "\\*", ".*", [{return, list}]) ++ "\\s*$", + Regexp = ["^\\s*", re:replace(TypeExp, "\\*", ".*"), + "(?:\\s*;.*?)?\\s*", $$], case re:run(MimeType, Regexp, [caseless]) of {match, _} -> true; @@ -452,3 +453,9 @@ reorder_results(Keys, SortedResults) when length(Keys) < 100 -> reorder_results(Keys, SortedResults) -> KeyDict = dict:from_list(SortedResults), [dict:fetch(Key, KeyDict) || Key <- Keys]. + +url_strip_password(Url) -> + re:replace(Url, + "http(s)?://([^:]+):[^@]+@(.*)$", + "http\\1://\\2:*****@\\3", + [{return, list}]). diff --git a/apps/couch/src/couch_view.erl b/apps/couch/src/couch_view.erl index 9de86b82..8dca17da 100644 --- a/apps/couch/src/couch_view.erl +++ b/apps/couch/src/couch_view.erl @@ -30,11 +30,9 @@ start_link() -> gen_server:start_link({local, couch_view}, couch_view, [], []). get_temp_updater(DbName, Language, DesignOptions, MapSrc, RedSrc) -> - % make temp group - % do we need to close this db? - {ok, _Db, Group} = + {ok, Group} = couch_view_group:open_temp_group(DbName, Language, DesignOptions, MapSrc, RedSrc), - case gen_server:call(couch_view, {get_group_server, DbName, Group}) of + case gen_server:call(couch_view, {get_group_server, DbName, Group}, infinity) of {ok, Pid} -> Pid; Error -> @@ -42,10 +40,9 @@ get_temp_updater(DbName, Language, DesignOptions, MapSrc, RedSrc) -> end. get_group_server(DbName, GroupId) -> - % get signature for group case couch_view_group:open_db_group(DbName, GroupId) of {ok, Group} -> - case gen_server:call(couch_view, {get_group_server, DbName, Group}) of + case gen_server:call(couch_view, {get_group_server, DbName, Group}, infinity) of {ok, Pid} -> Pid; Error -> diff --git a/apps/couch/src/couch_view_compactor.erl b/apps/couch/src/couch_view_compactor.erl index f56325a4..43db9036 100644 --- a/apps/couch/src/couch_view_compactor.erl +++ b/apps/couch/src/couch_view_compactor.erl @@ -53,18 +53,22 @@ compact_group(Group, EmptyGroup) -> TaskName = <<DbName/binary, ShortName/binary>>, couch_task_status:add_task(<<"View Group Compaction">>, TaskName, <<"">>), - Fun = fun(KV, {Bt, Acc, TotalCopied}) -> + Fun = fun({DocId, _ViewIdKeys} = KV, {Bt, Acc, TotalCopied, LastId}) -> + if DocId =:= LastId -> % COUCHDB-999 + Msg = "Duplicates of ~s detected in ~s ~s - rebuild required", + exit(io_lib:format(Msg, [DocId, DbName, GroupId])); + true -> ok end, if TotalCopied rem 10000 =:= 0 -> couch_task_status:update("Copied ~p of ~p Ids (~p%)", [TotalCopied, Count, (TotalCopied*100) div Count]), {ok, Bt2} = couch_btree:add(Bt, lists:reverse([KV|Acc])), - {ok, {Bt2, [], TotalCopied+1}}; + {ok, {Bt2, [], TotalCopied+1, DocId}}; true -> - {ok, {Bt, [KV|Acc], TotalCopied+1}} + {ok, {Bt, [KV|Acc], TotalCopied+1, DocId}} end end, - {ok, _, {Bt3, Uncopied, _Total}} = couch_btree:foldl(IdBtree, Fun, - {EmptyIdBtree, [], 0}), + {ok, _, {Bt3, Uncopied, _Total, _LastId}} = couch_btree:foldl(IdBtree, Fun, + {EmptyIdBtree, [], 0, nil}), {ok, NewIdBtree} = couch_btree:add(Bt3, lists:reverse(Uncopied)), NewViews = lists:map(fun({View, EmptyView}) -> diff --git a/apps/couch/src/couch_view_group.erl b/apps/couch/src/couch_view_group.erl index 377e7516..730db185 100644 --- a/apps/couch/src/couch_view_group.erl +++ b/apps/couch/src/couch_view_group.erl @@ -74,7 +74,7 @@ start_link(InitArgs) -> end. % init creates a closure which spawns the appropriate view_updater. -init({{_, DbName, _}=InitArgs, ReturnPid, Ref}) -> +init({{_, DbName, _} = InitArgs, ReturnPid, Ref}) -> process_flag(trap_exit, true), case prepare_group(InitArgs, false) of {ok, #group{fd=Fd, current_seq=Seq}=Group} -> @@ -86,12 +86,9 @@ init({{_, DbName, _}=InitArgs, ReturnPid, Ref}) -> ignore; _ -> try couch_db:monitor(Db) after couch_db:close(Db) end, - Owner = self(), - Pid = spawn_link(fun()-> couch_view_updater:update(Owner, Group) end), {ok, #group_state{ db_name= DbName, init_args=InitArgs, - updater_pid = Pid, group=Group#group{dbname=DbName}, ref_counter=erlang:monitor(process,Fd)}} end; @@ -178,6 +175,7 @@ handle_cast({compact_done, #group{fd=NewFd, current_seq=NewSeq} = NewGroup}, group = #group{name=GroupId, fd=OldFd, sig=GroupSig}, init_args = {RootDir, DbName, _}, updater_pid = UpdaterPid, + compactor_pid = CompactorPid, ref_counter = RefCounter } = State, @@ -199,6 +197,8 @@ handle_cast({compact_done, #group{fd=NewFd, current_seq=NewSeq} = NewGroup}, end, %% cleanup old group + unlink(CompactorPid), + receive {'EXIT', CompactorPid, normal} -> ok after 0 -> ok end, unlink(OldFd), erlang:demonitor(RefCounter), @@ -426,8 +426,8 @@ open_temp_group(DbName, Language, DesignOptions, MapSrc, RedSrc) -> def=MapSrc, reduce_funs= if RedSrc==[] -> []; true -> [{<<"_temp">>, RedSrc}] end, options=DesignOptions}, - - {ok, Db, set_view_sig(#group{name = <<"_temp">>, views=[View], + couch_db:close(Db), + {ok, set_view_sig(#group{name = <<"_temp">>, views=[View], def_lang=Language, design_options=DesignOptions})}; Error -> Error |