diff options
author | Damien F. Katz <damien@apache.org> | 2008-04-23 00:25:23 +0000 |
---|---|---|
committer | Damien F. Katz <damien@apache.org> | 2008-04-23 00:25:23 +0000 |
commit | 689f9830b50ac6b7a673ce467626c6d2deef645c (patch) | |
tree | 2a321ba63b940883d1b4373a34dc181709c4e92a /src | |
parent | 6949f81ae419c5d10131c71732ca0637ef7d234d (diff) |
Replicator optmizations and fix for unnecessary document copy during re-replication
git-svn-id: https://svn.apache.org/repos/asf/incubator/couchdb/trunk@650705 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'src')
-rw-r--r-- | src/couchdb/couch.app.tpl.in | 1 | ||||
-rw-r--r-- | src/couchdb/couch_db.erl | 73 | ||||
-rw-r--r-- | src/couchdb/couch_httpd.erl | 4 | ||||
-rw-r--r-- | src/couchdb/couch_key_tree.erl | 2 | ||||
-rw-r--r-- | src/couchdb/couch_rep.erl | 114 | ||||
-rw-r--r-- | src/couchdb/couch_server_sup.erl | 7 | ||||
-rw-r--r-- | src/couchdb/couch_util.erl | 56 |
7 files changed, 132 insertions, 125 deletions
diff --git a/src/couchdb/couch.app.tpl.in b/src/couchdb/couch.app.tpl.in index 3abc6302..96e5afc1 100644 --- a/src/couchdb/couch.app.tpl.in +++ b/src/couchdb/couch.app.tpl.in @@ -21,7 +21,6 @@ couch_rep]}, {registered,[couch_server, couch_server_sup, - couch_util, couch_view, couch_query_servers, couch_ft_query]}, diff --git a/src/couchdb/couch_db.erl b/src/couchdb/couch_db.erl index 289cc4f9..b45a1ff4 100644 --- a/src/couchdb/couch_db.erl +++ b/src/couchdb/couch_db.erl @@ -133,7 +133,8 @@ open_doc(MainPid, Id, Options) -> end. open_doc_revs(MainPid, Id, Revs, Options) -> - open_doc_revs_int(get_db(MainPid), Id, Revs, Options). + [Result] = open_doc_revs_int(get_db(MainPid), [{Id, Revs}], Options), + Result. get_missing_revs(MainPid, IdRevsList) -> Ids = [Id1 || {Id1, _Revs} <- IdRevsList], @@ -148,7 +149,9 @@ get_missing_revs(MainPid, IdRevsList) -> end end, IdRevsList, FullDocInfoResults), - {ok, Results}. + % strip out the non-missing ids + Missing = [{Id, Revs} || {Id, Revs} <- Results, Revs /= []], + {ok, Missing}. get_doc_info(Db, Id) -> case get_full_doc_info(Db, Id) of @@ -563,38 +566,44 @@ get_db(MainPid) -> {ok, Db} = gen_server:call(MainPid, get_db), Db. -open_doc_revs_int(Db, Id, Revs, Options) -> - case get_full_doc_info(Db, Id) of - {ok, #full_doc_info{rev_tree=RevTree}} -> - {FoundRevs, MissingRevs} = - case Revs of - all -> - {couch_key_tree:get_all_leafs(RevTree), []}; - _ -> - case lists:member(latest, Options) of - true -> - couch_key_tree:get_key_leafs(RevTree, Revs); - false -> - couch_key_tree:get(RevTree, Revs) +open_doc_revs_int(Db, IdRevs, Options) -> + Ids = [Id || {Id, _Revs} <- IdRevs], + LookupResults = get_full_doc_infos(Db, Ids), + lists:zipwith( + fun({Id, Revs}, Lookup) -> + case Lookup of + {ok, #full_doc_info{rev_tree=RevTree}} -> + {FoundRevs, MissingRevs} = + case Revs of + all -> + {couch_key_tree:get_all_leafs(RevTree), []}; + _ -> + case lists:member(latest, Options) of + true -> + couch_key_tree:get_key_leafs(RevTree, Revs); + false -> + couch_key_tree:get(RevTree, Revs) + end + end, + FoundResults = + lists:map(fun({Rev, Value, FoundRevPath}) -> + case Value of + ?REV_MISSING -> + % we have the rev in our list but know nothing about it + {{not_found, missing}, Rev}; + {IsDeleted, SummaryPtr} -> + {ok, make_doc(Db, Id, IsDeleted, SummaryPtr, FoundRevPath)} + end + end, FoundRevs), + Results = FoundResults ++ [{{not_found, missing}, MissingRev} || MissingRev <- MissingRevs], + {ok, Results}; + not_found when Revs == all -> + {ok, []}; + not_found -> + {ok, [{{not_found, missing}, Rev} || Rev <- Revs]} end end, - FoundResults = - lists:map(fun({Rev, Value, FoundRevPath}) -> - case Value of - ?REV_MISSING -> - % we have the rev in our list but know nothing about it - {{not_found, missing}, Rev}; - {IsDeleted, SummaryPtr} -> - {ok, make_doc(Db, Id, IsDeleted, SummaryPtr, FoundRevPath)} - end - end, FoundRevs), - Results = FoundResults ++ [{{not_found, missing}, MissingRev} || MissingRev <- MissingRevs], - {ok, Results}; - not_found when Revs == all -> - {ok, []}; - not_found -> - {ok, [{{not_found, missing}, Rev} || Rev <- Revs]} - end. + IdRevs, LookupResults). open_doc_int(Db, ?LOCAL_DOC_PREFIX ++ _ = Id, _Options) -> case couch_btree:lookup(Db#db.local_docs_btree, [Id]) of diff --git a/src/couchdb/couch_httpd.erl b/src/couchdb/couch_httpd.erl index 9351dc89..95693262 100644 --- a/src/couchdb/couch_httpd.erl +++ b/src/couchdb/couch_httpd.erl @@ -76,9 +76,9 @@ handle_request(Req, DocumentRoot) -> ]). handle_request(Req, DocumentRoot, Method, Path) -> - Start = erlang:now(), + % Start = erlang:now(), X = handle_request0(Req, DocumentRoot, Method, Path), - io:format("now_diff:~p~n", [timer:now_diff(erlang:now(), Start)]), + % io:format("now_diff:~p~n", [timer:now_diff(erlang:now(), Start)]), X. handle_request0(Req, DocumentRoot, Method, Path) -> diff --git a/src/couchdb/couch_key_tree.erl b/src/couchdb/couch_key_tree.erl index 705365bd..80a7005f 100644 --- a/src/couchdb/couch_key_tree.erl +++ b/src/couchdb/couch_key_tree.erl @@ -52,7 +52,7 @@ find_missing(_Tree, []) -> find_missing([], Keys) -> Keys; find_missing([{Key, _, SubTree} | RestTree], Keys) -> - SrcKeys2 = Keys -- Key, + SrcKeys2 = Keys -- [Key], SrcKeys3 = find_missing(SubTree, SrcKeys2), find_missing(RestTree, SrcKeys3). diff --git a/src/couchdb/couch_rep.erl b/src/couchdb/couch_rep.erl index bead047c..f6fd0fad 100644 --- a/src/couchdb/couch_rep.erl +++ b/src/couchdb/couch_rep.erl @@ -16,14 +16,6 @@ -export([replicate/2, replicate/3]). --record(stats, { - docs_read=0, - read_errors=0, - docs_copied=0, - copy_errors=0 - }). - - url_encode([H|T]) -> if H >= $a, $z >= H -> @@ -87,7 +79,8 @@ replicate(Source, Target, Options) -> false -> SeqNum0 end, - {NewSeqNum, Stats} = pull_rep(DbTgt, DbSrc, SeqNum, #stats{}), + {NewSeqNum, Stats} = pull_rep(DbTgt, DbSrc, SeqNum), + case NewSeqNum == SeqNum andalso OldRepHistoryProps /= [] of true -> % nothing changed, don't record results @@ -98,11 +91,7 @@ replicate(Source, Target, Options) -> [{"start_time", StartTime}, {"end_time", httpd_util:rfc1123_date()}, {"start_last_seq", SeqNum}, - {"end_last_seq", NewSeqNum}, - {"docs_read", Stats#stats.docs_read}, - {"read_errors", Stats#stats.read_errors}, - {"docs_copied", Stats#stats.docs_copied}, - {"copy_errors", Stats#stats.copy_errors}]} + {"end_last_seq", NewSeqNum} | Stats]} | tuple_to_list(proplists:get_value("history", OldRepHistoryProps, {}))], % something changed, record results NewRepHistory = @@ -116,34 +105,89 @@ replicate(Source, Target, Options) -> {ok, NewRepHistory} end. -pull_rep(DbTarget, DbSource, SourceSeqNum, Stats) -> +pull_rep(DbTarget, DbSource, SourceSeqNum) -> + Parent = self(), + SaveDocsPid = spawn_link(fun() -> + save_docs_loop(Parent, DbTarget, 0) end), + OpenDocsPid = spawn_link(fun() -> + open_doc_revs_loop(Parent, DbSource, SaveDocsPid, 0) end), + MissingRevsPid = spawn_link(fun() -> + get_missing_revs_loop(Parent, DbTarget, OpenDocsPid, 0, 0) end), {ok, NewSeq} = enum_docs_since(DbSource, SourceSeqNum, - fun(#doc_info{update_seq=Seq}=SrcDocInfo, _, {_, AccStats}) -> - Stats2 = maybe_save_docs(DbTarget, DbSource, SrcDocInfo, AccStats), - {ok, {Seq, Stats2}} - end, {SourceSeqNum, Stats}), - NewSeq. - + fun(SrcDocInfo, _, _) -> + #doc_info{id=Id, + rev=Rev, + conflict_revs=Conflicts, + deleted_conflict_revs=DelConflicts, + update_seq=Seq} = SrcDocInfo, + SrcRevs = [Rev | Conflicts] ++ DelConflicts, + MissingRevsPid ! {Id, SrcRevs}, % send to the missing revs process + {ok, Seq} + end, SourceSeqNum), + MissingRevsPid ! shutdown, + receive {done, MissingRevsPid, Stats1} -> ok end, + + OpenDocsPid ! shutdown, + receive {done, OpenDocsPid, Stats2} -> ok end, + + SaveDocsPid ! shutdown, + receive {done, SaveDocsPid, Stats3} -> ok end, + + {NewSeq, Stats1 ++ Stats2 ++ Stats3}. + + +receive_id_revs() -> + receive + {Id, Revs} -> + [{Id, Revs} | receive_id_revs()] + after 1 -> + [] + end. -maybe_save_docs(DbTarget, DbSource, - #doc_info{id=Id, rev=Rev, conflict_revs=Conflicts, deleted_conflict_revs=DelConflicts}, - Stats) -> - SrcRevs = [Rev | Conflicts] ++ DelConflicts, - {ok, [{Id, MissingRevs}]} = get_missing_revs(DbTarget, [{Id, SrcRevs}]), +get_missing_revs_loop(Parent, DbTarget, OpenDocsPid, RevsChecked, MissingFound) -> + receive + {Id, Revs} -> + Changed = [{Id, Revs} | receive_id_revs()], + {ok, Missing} = get_missing_revs(DbTarget, Changed), + [OpenDocsPid ! {Id0, MissingRevs} || {Id0, MissingRevs} <- Missing], + get_missing_revs_loop(Parent, DbTarget, OpenDocsPid, + RevsChecked + length(Changed), + MissingFound + length(Missing)); + shutdown -> + Parent ! {done, self(), [{missing_checked, RevsChecked}, + {missing_found, MissingFound}]} + end. + - case MissingRevs of - [] -> - Stats; - _Else -> +open_doc_revs_loop(Parent, DbSource, SaveDocsPid, DocsRead) -> + receive + {Id, MissingRevs} -> {ok, DocResults} = open_doc_revs(DbSource, Id, MissingRevs, [latest]), % only save successful reads Docs = [RevDoc || {ok, RevDoc} <- DocResults], - ok = save_docs(DbTarget, Docs, []), + SaveDocsPid ! Docs, + open_doc_revs_loop(Parent, DbSource, SaveDocsPid, DocsRead + length(Docs)); + shutdown -> + Parent ! {done, self(), [{docs_read, DocsRead}]} + end. + - Stats#stats{ - docs_read=Stats#stats.docs_read + length(Docs), - read_errors=Stats#stats.read_errors + length(DocResults) - length(Docs), - docs_copied=Stats#stats.docs_copied + length(Docs)} +receive_docs() -> + receive + Docs when is_list(Docs) -> + Docs ++ receive_docs() + after 1 -> + [] + end. + +save_docs_loop(Parent, DbTarget, DocsWritten) -> + receive + Docs0 when is_list(Docs0) -> + Docs = Docs0 ++ receive_docs(), + ok = save_docs(DbTarget, Docs, []), + save_docs_loop(Parent, DbTarget, DocsWritten + length(Docs)); + shutdown -> + Parent ! {done, self(), [{docs_written, DocsWritten}]} end. diff --git a/src/couchdb/couch_server_sup.erl b/src/couchdb/couch_server_sup.erl index 6fed6223..22318154 100644 --- a/src/couchdb/couch_server_sup.erl +++ b/src/couchdb/couch_server_sup.erl @@ -97,12 +97,6 @@ start_server(InputIniFilename) -> brutal_kill, worker, [couch_server]}, - {couch_util, - {couch_util, start_link, [UtilDriverDir]}, - permanent, - brutal_kill, - worker, - [couch_util]}, {couch_query_servers, {couch_query_servers, start_link, [QueryServers]}, permanent, @@ -146,6 +140,7 @@ start_server(InputIniFilename) -> io:format("couch ~s (LogLevel=~s)~n", [couch_server:get_version(), LogLevel]), io:format("~s~n", [ConsoleStartupMsg]), + couch_util:start_driver(UtilDriverDir), % ensure these applications are running application:start(inets), diff --git a/src/couchdb/couch_util.erl b/src/couchdb/couch_util.erl index 94648b0d..0c51f39b 100644 --- a/src/couchdb/couch_util.erl +++ b/src/couchdb/couch_util.erl @@ -11,32 +11,25 @@ % the License. -module(couch_util). --behaviour(gen_server). --export([start_link/0,start_link/1]). +-export([start_driver/1]). -export([parse_ini/1,should_flush/0, should_flush/1]). -export([new_uuid/0, rand32/0, implode/2, collate/2, collate/3]). --export([abs_pathname/1,abs_pathname/2, trim/1, ascii_lower/1, test/0]). +-export([abs_pathname/1,abs_pathname/2, trim/1, ascii_lower/1]). -export([encodeBase64/1, decodeBase64/1]). --export([init/1, terminate/2, handle_call/3]). --export([handle_cast/2,code_change/3,handle_info/2]). % arbitrarily chosen amount of memory to use before flushing to disk -define(FLUSH_MAX_MEM, 10000000). -start_link() -> - start_link(""). - -start_link("") -> - start_link(filename:join(code:priv_dir(couch), "lib")); -start_link(LibDir) -> +start_driver("") -> + start_driver(filename:join(code:priv_dir(couch), "lib")); +start_driver(LibDir) -> case erl_ddll:load_driver(LibDir, "couch_erl_driver") of ok -> ok; {error, already_loaded} -> ok; - {error, ErrorDesc} -> exit({error, ErrorDesc}) - end, - gen_server:start_link({local, couch_util}, couch_util, [], []). + Error -> exit(Error) + end. new_uuid() -> @@ -45,9 +38,7 @@ new_uuid() -> to_hex([]) -> []; to_hex([H|T]) -> - Digit1 = H div 16, - Digit2 = H rem 16, - [to_digit(Digit1), to_digit(Digit2) | to_hex(T)]. + [to_digit(H div 16), to_digit(H rem 16) | to_hex(T)]. to_digit(N) when N < 10 -> $0 + N; @@ -195,33 +186,6 @@ parse_ini(FileContents) -> end, {"", []}, Lines), {ok, lists:reverse(ParsedIniValues)}. -init([]) -> - {A,B,C} = erlang:now(), - random:seed(A,B,C), - {ok, dummy_server}. - -terminate(_Reason, _Server) -> - ok. - -handle_call(rand32, _From, Server) -> - {reply, rand32_int(), Server}. - -handle_cast(_Msg, State) -> - {noreply,State}. - -code_change(_OldVsn, State, _Extra) -> - {ok, State}. - -handle_info(_Info, State) -> - {noreply, State}. - - - - - -rand32_int() -> - random:uniform(16#FFFFFFFF + 1) - 1. - drv_port() -> case get(couch_drv_port) of undefined -> @@ -331,7 +295,3 @@ dec(C) -> 62*?st(C,43) + ?st(C,47) + (C-59)*?st(C,48) - 69*?st(C,65) - 6*?st(C,97). - -test() -> - start_link("debug"), - collate("a","b",[]). |