summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorDamien F. Katz <damien@apache.org>2008-04-23 00:25:23 +0000
committerDamien F. Katz <damien@apache.org>2008-04-23 00:25:23 +0000
commit689f9830b50ac6b7a673ce467626c6d2deef645c (patch)
tree2a321ba63b940883d1b4373a34dc181709c4e92a /src
parent6949f81ae419c5d10131c71732ca0637ef7d234d (diff)
Replicator optmizations and fix for unnecessary document copy during re-replication
git-svn-id: https://svn.apache.org/repos/asf/incubator/couchdb/trunk@650705 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'src')
-rw-r--r--src/couchdb/couch.app.tpl.in1
-rw-r--r--src/couchdb/couch_db.erl73
-rw-r--r--src/couchdb/couch_httpd.erl4
-rw-r--r--src/couchdb/couch_key_tree.erl2
-rw-r--r--src/couchdb/couch_rep.erl114
-rw-r--r--src/couchdb/couch_server_sup.erl7
-rw-r--r--src/couchdb/couch_util.erl56
7 files changed, 132 insertions, 125 deletions
diff --git a/src/couchdb/couch.app.tpl.in b/src/couchdb/couch.app.tpl.in
index 3abc6302..96e5afc1 100644
--- a/src/couchdb/couch.app.tpl.in
+++ b/src/couchdb/couch.app.tpl.in
@@ -21,7 +21,6 @@
couch_rep]},
{registered,[couch_server,
couch_server_sup,
- couch_util,
couch_view,
couch_query_servers,
couch_ft_query]},
diff --git a/src/couchdb/couch_db.erl b/src/couchdb/couch_db.erl
index 289cc4f9..b45a1ff4 100644
--- a/src/couchdb/couch_db.erl
+++ b/src/couchdb/couch_db.erl
@@ -133,7 +133,8 @@ open_doc(MainPid, Id, Options) ->
end.
open_doc_revs(MainPid, Id, Revs, Options) ->
- open_doc_revs_int(get_db(MainPid), Id, Revs, Options).
+ [Result] = open_doc_revs_int(get_db(MainPid), [{Id, Revs}], Options),
+ Result.
get_missing_revs(MainPid, IdRevsList) ->
Ids = [Id1 || {Id1, _Revs} <- IdRevsList],
@@ -148,7 +149,9 @@ get_missing_revs(MainPid, IdRevsList) ->
end
end,
IdRevsList, FullDocInfoResults),
- {ok, Results}.
+ % strip out the non-missing ids
+ Missing = [{Id, Revs} || {Id, Revs} <- Results, Revs /= []],
+ {ok, Missing}.
get_doc_info(Db, Id) ->
case get_full_doc_info(Db, Id) of
@@ -563,38 +566,44 @@ get_db(MainPid) ->
{ok, Db} = gen_server:call(MainPid, get_db),
Db.
-open_doc_revs_int(Db, Id, Revs, Options) ->
- case get_full_doc_info(Db, Id) of
- {ok, #full_doc_info{rev_tree=RevTree}} ->
- {FoundRevs, MissingRevs} =
- case Revs of
- all ->
- {couch_key_tree:get_all_leafs(RevTree), []};
- _ ->
- case lists:member(latest, Options) of
- true ->
- couch_key_tree:get_key_leafs(RevTree, Revs);
- false ->
- couch_key_tree:get(RevTree, Revs)
+open_doc_revs_int(Db, IdRevs, Options) ->
+ Ids = [Id || {Id, _Revs} <- IdRevs],
+ LookupResults = get_full_doc_infos(Db, Ids),
+ lists:zipwith(
+ fun({Id, Revs}, Lookup) ->
+ case Lookup of
+ {ok, #full_doc_info{rev_tree=RevTree}} ->
+ {FoundRevs, MissingRevs} =
+ case Revs of
+ all ->
+ {couch_key_tree:get_all_leafs(RevTree), []};
+ _ ->
+ case lists:member(latest, Options) of
+ true ->
+ couch_key_tree:get_key_leafs(RevTree, Revs);
+ false ->
+ couch_key_tree:get(RevTree, Revs)
+ end
+ end,
+ FoundResults =
+ lists:map(fun({Rev, Value, FoundRevPath}) ->
+ case Value of
+ ?REV_MISSING ->
+ % we have the rev in our list but know nothing about it
+ {{not_found, missing}, Rev};
+ {IsDeleted, SummaryPtr} ->
+ {ok, make_doc(Db, Id, IsDeleted, SummaryPtr, FoundRevPath)}
+ end
+ end, FoundRevs),
+ Results = FoundResults ++ [{{not_found, missing}, MissingRev} || MissingRev <- MissingRevs],
+ {ok, Results};
+ not_found when Revs == all ->
+ {ok, []};
+ not_found ->
+ {ok, [{{not_found, missing}, Rev} || Rev <- Revs]}
end
end,
- FoundResults =
- lists:map(fun({Rev, Value, FoundRevPath}) ->
- case Value of
- ?REV_MISSING ->
- % we have the rev in our list but know nothing about it
- {{not_found, missing}, Rev};
- {IsDeleted, SummaryPtr} ->
- {ok, make_doc(Db, Id, IsDeleted, SummaryPtr, FoundRevPath)}
- end
- end, FoundRevs),
- Results = FoundResults ++ [{{not_found, missing}, MissingRev} || MissingRev <- MissingRevs],
- {ok, Results};
- not_found when Revs == all ->
- {ok, []};
- not_found ->
- {ok, [{{not_found, missing}, Rev} || Rev <- Revs]}
- end.
+ IdRevs, LookupResults).
open_doc_int(Db, ?LOCAL_DOC_PREFIX ++ _ = Id, _Options) ->
case couch_btree:lookup(Db#db.local_docs_btree, [Id]) of
diff --git a/src/couchdb/couch_httpd.erl b/src/couchdb/couch_httpd.erl
index 9351dc89..95693262 100644
--- a/src/couchdb/couch_httpd.erl
+++ b/src/couchdb/couch_httpd.erl
@@ -76,9 +76,9 @@ handle_request(Req, DocumentRoot) ->
]).
handle_request(Req, DocumentRoot, Method, Path) ->
- Start = erlang:now(),
+ % Start = erlang:now(),
X = handle_request0(Req, DocumentRoot, Method, Path),
- io:format("now_diff:~p~n", [timer:now_diff(erlang:now(), Start)]),
+ % io:format("now_diff:~p~n", [timer:now_diff(erlang:now(), Start)]),
X.
handle_request0(Req, DocumentRoot, Method, Path) ->
diff --git a/src/couchdb/couch_key_tree.erl b/src/couchdb/couch_key_tree.erl
index 705365bd..80a7005f 100644
--- a/src/couchdb/couch_key_tree.erl
+++ b/src/couchdb/couch_key_tree.erl
@@ -52,7 +52,7 @@ find_missing(_Tree, []) ->
find_missing([], Keys) ->
Keys;
find_missing([{Key, _, SubTree} | RestTree], Keys) ->
- SrcKeys2 = Keys -- Key,
+ SrcKeys2 = Keys -- [Key],
SrcKeys3 = find_missing(SubTree, SrcKeys2),
find_missing(RestTree, SrcKeys3).
diff --git a/src/couchdb/couch_rep.erl b/src/couchdb/couch_rep.erl
index bead047c..f6fd0fad 100644
--- a/src/couchdb/couch_rep.erl
+++ b/src/couchdb/couch_rep.erl
@@ -16,14 +16,6 @@
-export([replicate/2, replicate/3]).
--record(stats, {
- docs_read=0,
- read_errors=0,
- docs_copied=0,
- copy_errors=0
- }).
-
-
url_encode([H|T]) ->
if
H >= $a, $z >= H ->
@@ -87,7 +79,8 @@ replicate(Source, Target, Options) ->
false -> SeqNum0
end,
- {NewSeqNum, Stats} = pull_rep(DbTgt, DbSrc, SeqNum, #stats{}),
+ {NewSeqNum, Stats} = pull_rep(DbTgt, DbSrc, SeqNum),
+
case NewSeqNum == SeqNum andalso OldRepHistoryProps /= [] of
true ->
% nothing changed, don't record results
@@ -98,11 +91,7 @@ replicate(Source, Target, Options) ->
[{"start_time", StartTime},
{"end_time", httpd_util:rfc1123_date()},
{"start_last_seq", SeqNum},
- {"end_last_seq", NewSeqNum},
- {"docs_read", Stats#stats.docs_read},
- {"read_errors", Stats#stats.read_errors},
- {"docs_copied", Stats#stats.docs_copied},
- {"copy_errors", Stats#stats.copy_errors}]}
+ {"end_last_seq", NewSeqNum} | Stats]}
| tuple_to_list(proplists:get_value("history", OldRepHistoryProps, {}))],
% something changed, record results
NewRepHistory =
@@ -116,34 +105,89 @@ replicate(Source, Target, Options) ->
{ok, NewRepHistory}
end.
-pull_rep(DbTarget, DbSource, SourceSeqNum, Stats) ->
+pull_rep(DbTarget, DbSource, SourceSeqNum) ->
+ Parent = self(),
+ SaveDocsPid = spawn_link(fun() ->
+ save_docs_loop(Parent, DbTarget, 0) end),
+ OpenDocsPid = spawn_link(fun() ->
+ open_doc_revs_loop(Parent, DbSource, SaveDocsPid, 0) end),
+ MissingRevsPid = spawn_link(fun() ->
+ get_missing_revs_loop(Parent, DbTarget, OpenDocsPid, 0, 0) end),
{ok, NewSeq} = enum_docs_since(DbSource, SourceSeqNum,
- fun(#doc_info{update_seq=Seq}=SrcDocInfo, _, {_, AccStats}) ->
- Stats2 = maybe_save_docs(DbTarget, DbSource, SrcDocInfo, AccStats),
- {ok, {Seq, Stats2}}
- end, {SourceSeqNum, Stats}),
- NewSeq.
-
+ fun(SrcDocInfo, _, _) ->
+ #doc_info{id=Id,
+ rev=Rev,
+ conflict_revs=Conflicts,
+ deleted_conflict_revs=DelConflicts,
+ update_seq=Seq} = SrcDocInfo,
+ SrcRevs = [Rev | Conflicts] ++ DelConflicts,
+ MissingRevsPid ! {Id, SrcRevs}, % send to the missing revs process
+ {ok, Seq}
+ end, SourceSeqNum),
+ MissingRevsPid ! shutdown,
+ receive {done, MissingRevsPid, Stats1} -> ok end,
+
+ OpenDocsPid ! shutdown,
+ receive {done, OpenDocsPid, Stats2} -> ok end,
+
+ SaveDocsPid ! shutdown,
+ receive {done, SaveDocsPid, Stats3} -> ok end,
+
+ {NewSeq, Stats1 ++ Stats2 ++ Stats3}.
+
+
+receive_id_revs() ->
+ receive
+ {Id, Revs} ->
+ [{Id, Revs} | receive_id_revs()]
+ after 1 ->
+ []
+ end.
-maybe_save_docs(DbTarget, DbSource,
- #doc_info{id=Id, rev=Rev, conflict_revs=Conflicts, deleted_conflict_revs=DelConflicts},
- Stats) ->
- SrcRevs = [Rev | Conflicts] ++ DelConflicts,
- {ok, [{Id, MissingRevs}]} = get_missing_revs(DbTarget, [{Id, SrcRevs}]),
+get_missing_revs_loop(Parent, DbTarget, OpenDocsPid, RevsChecked, MissingFound) ->
+ receive
+ {Id, Revs} ->
+ Changed = [{Id, Revs} | receive_id_revs()],
+ {ok, Missing} = get_missing_revs(DbTarget, Changed),
+ [OpenDocsPid ! {Id0, MissingRevs} || {Id0, MissingRevs} <- Missing],
+ get_missing_revs_loop(Parent, DbTarget, OpenDocsPid,
+ RevsChecked + length(Changed),
+ MissingFound + length(Missing));
+ shutdown ->
+ Parent ! {done, self(), [{missing_checked, RevsChecked},
+ {missing_found, MissingFound}]}
+ end.
+
- case MissingRevs of
- [] ->
- Stats;
- _Else ->
+open_doc_revs_loop(Parent, DbSource, SaveDocsPid, DocsRead) ->
+ receive
+ {Id, MissingRevs} ->
{ok, DocResults} = open_doc_revs(DbSource, Id, MissingRevs, [latest]),
% only save successful reads
Docs = [RevDoc || {ok, RevDoc} <- DocResults],
- ok = save_docs(DbTarget, Docs, []),
+ SaveDocsPid ! Docs,
+ open_doc_revs_loop(Parent, DbSource, SaveDocsPid, DocsRead + length(Docs));
+ shutdown ->
+ Parent ! {done, self(), [{docs_read, DocsRead}]}
+ end.
+
- Stats#stats{
- docs_read=Stats#stats.docs_read + length(Docs),
- read_errors=Stats#stats.read_errors + length(DocResults) - length(Docs),
- docs_copied=Stats#stats.docs_copied + length(Docs)}
+receive_docs() ->
+ receive
+ Docs when is_list(Docs) ->
+ Docs ++ receive_docs()
+ after 1 ->
+ []
+ end.
+
+save_docs_loop(Parent, DbTarget, DocsWritten) ->
+ receive
+ Docs0 when is_list(Docs0) ->
+ Docs = Docs0 ++ receive_docs(),
+ ok = save_docs(DbTarget, Docs, []),
+ save_docs_loop(Parent, DbTarget, DocsWritten + length(Docs));
+ shutdown ->
+ Parent ! {done, self(), [{docs_written, DocsWritten}]}
end.
diff --git a/src/couchdb/couch_server_sup.erl b/src/couchdb/couch_server_sup.erl
index 6fed6223..22318154 100644
--- a/src/couchdb/couch_server_sup.erl
+++ b/src/couchdb/couch_server_sup.erl
@@ -97,12 +97,6 @@ start_server(InputIniFilename) ->
brutal_kill,
worker,
[couch_server]},
- {couch_util,
- {couch_util, start_link, [UtilDriverDir]},
- permanent,
- brutal_kill,
- worker,
- [couch_util]},
{couch_query_servers,
{couch_query_servers, start_link, [QueryServers]},
permanent,
@@ -146,6 +140,7 @@ start_server(InputIniFilename) ->
io:format("couch ~s (LogLevel=~s)~n", [couch_server:get_version(), LogLevel]),
io:format("~s~n", [ConsoleStartupMsg]),
+ couch_util:start_driver(UtilDriverDir),
% ensure these applications are running
application:start(inets),
diff --git a/src/couchdb/couch_util.erl b/src/couchdb/couch_util.erl
index 94648b0d..0c51f39b 100644
--- a/src/couchdb/couch_util.erl
+++ b/src/couchdb/couch_util.erl
@@ -11,32 +11,25 @@
% the License.
-module(couch_util).
--behaviour(gen_server).
--export([start_link/0,start_link/1]).
+-export([start_driver/1]).
-export([parse_ini/1,should_flush/0, should_flush/1]).
-export([new_uuid/0, rand32/0, implode/2, collate/2, collate/3]).
--export([abs_pathname/1,abs_pathname/2, trim/1, ascii_lower/1, test/0]).
+-export([abs_pathname/1,abs_pathname/2, trim/1, ascii_lower/1]).
-export([encodeBase64/1, decodeBase64/1]).
--export([init/1, terminate/2, handle_call/3]).
--export([handle_cast/2,code_change/3,handle_info/2]).
% arbitrarily chosen amount of memory to use before flushing to disk
-define(FLUSH_MAX_MEM, 10000000).
-start_link() ->
- start_link("").
-
-start_link("") ->
- start_link(filename:join(code:priv_dir(couch), "lib"));
-start_link(LibDir) ->
+start_driver("") ->
+ start_driver(filename:join(code:priv_dir(couch), "lib"));
+start_driver(LibDir) ->
case erl_ddll:load_driver(LibDir, "couch_erl_driver") of
ok -> ok;
{error, already_loaded} -> ok;
- {error, ErrorDesc} -> exit({error, ErrorDesc})
- end,
- gen_server:start_link({local, couch_util}, couch_util, [], []).
+ Error -> exit(Error)
+ end.
new_uuid() ->
@@ -45,9 +38,7 @@ new_uuid() ->
to_hex([]) ->
[];
to_hex([H|T]) ->
- Digit1 = H div 16,
- Digit2 = H rem 16,
- [to_digit(Digit1), to_digit(Digit2) | to_hex(T)].
+ [to_digit(H div 16), to_digit(H rem 16) | to_hex(T)].
to_digit(N) when N < 10 ->
$0 + N;
@@ -195,33 +186,6 @@ parse_ini(FileContents) ->
end, {"", []}, Lines),
{ok, lists:reverse(ParsedIniValues)}.
-init([]) ->
- {A,B,C} = erlang:now(),
- random:seed(A,B,C),
- {ok, dummy_server}.
-
-terminate(_Reason, _Server) ->
- ok.
-
-handle_call(rand32, _From, Server) ->
- {reply, rand32_int(), Server}.
-
-handle_cast(_Msg, State) ->
- {noreply,State}.
-
-code_change(_OldVsn, State, _Extra) ->
- {ok, State}.
-
-handle_info(_Info, State) ->
- {noreply, State}.
-
-
-
-
-
-rand32_int() ->
- random:uniform(16#FFFFFFFF + 1) - 1.
-
drv_port() ->
case get(couch_drv_port) of
undefined ->
@@ -331,7 +295,3 @@ dec(C) ->
62*?st(C,43) + ?st(C,47) + (C-59)*?st(C,48) - 69*?st(C,65) - 6*?st(C,97).
-
-test() ->
- start_link("debug"),
- collate("a","b",[]).