summaryrefslogtreecommitdiff
path: root/src/couchdb/couch_rep_reader.erl
diff options
context:
space:
mode:
authorAdam Kocoloski <kocolosk@apache.org>2009-08-24 17:34:48 +0000
committerAdam Kocoloski <kocolosk@apache.org>2009-08-24 17:34:48 +0000
commit64d37bfd07c5e62561a38570f2cd3983567d3967 (patch)
tree5f6176962b594bc861654d7f5695b3e979d0f89d /src/couchdb/couch_rep_reader.erl
parente1b6dd15d993a3f424747ef5fc89ffa95a961613 (diff)
more precise and accurate calculation of replication progress
git-svn-id: https://svn.apache.org/repos/asf/couchdb/trunk@807308 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'src/couchdb/couch_rep_reader.erl')
-rw-r--r--src/couchdb/couch_rep_reader.erl167
1 files changed, 83 insertions, 84 deletions
diff --git a/src/couchdb/couch_rep_reader.erl b/src/couchdb/couch_rep_reader.erl
index a5a1fecd..4feb3804 100644
--- a/src/couchdb/couch_rep_reader.erl
+++ b/src/couchdb/couch_rep_reader.erl
@@ -38,10 +38,9 @@
reply_to = nil,
complete = false,
monitor_count = 0,
- monitor_count_by_seq = ets:new(monitor_count_by_seq, [set, private]),
- monitors_by_ref = ets:new(monitors_by_ref, [set, private]),
pending_doc_request = nil,
- high_missing_seq = 0
+ requested_seqs = [],
+ opened_seqs = []
}).
start_link(Parent, Source, MissingRevs, PostProps) ->
@@ -67,28 +66,25 @@ init([Parent, Source, MissingRevs, _PostProps]) ->
},
{ok, State}.
-handle_call({add_docs, Docs}, From, State) ->
+handle_call({add_docs, Seq, Docs}, From, State) ->
State#state.parent ! {update_stats, docs_read, length(Docs)},
- handle_add_docs(lists:flatten(Docs), From, State);
+ handle_add_docs(Seq, lists:flatten(Docs), From, State);
+
+handle_call({add_request_seqs, Seqs}, _From, State) ->
+ SeqList = State#state.requested_seqs,
+ {reply, ok, State#state{requested_seqs = lists:merge(Seqs, SeqList)}};
handle_call(next_docs, From, State) ->
handle_next_docs(From, State);
-handle_call({open_doc_revs, Id, Revs, HighSeq}, From, State) ->
- handle_open_doc_revs(Id, Revs, HighSeq, From, State);
-
-handle_call({set_monitor_count, Seq, Count}, _From, State) ->
- ets:insert(State#state.monitor_count_by_seq, {Seq,Count}),
- {reply, ok, State};
-
-handle_call({update_high_seq, HighSeq}, _From, State) ->
- {reply, ok, State#state{high_missing_seq=HighSeq}}.
+handle_call({open_remote_doc, Id, Seq, Revs}, From, State) ->
+ handle_open_remote_doc(Id, Seq, Revs, From, State).
handle_cast(_Msg, State) ->
{noreply, State}.
-handle_info({'DOWN', Ref, _, _, Reason}, State) ->
- handle_monitor_down(Reason, Ref, State);
+handle_info({'DOWN', _, _, _, Reason}, State) ->
+ handle_monitor_down(Reason, State);
handle_info({'EXIT', Loop, complete}, #state{reader_loop=Loop} = State) ->
handle_reader_loop_complete(State).
@@ -102,83 +98,86 @@ code_change(_OldVsn, State, _Extra) ->
%internal funs
-handle_add_docs(DocsToAdd, From, #state{reply_to=nil} = State) ->
- NewState = State#state{
- docs = queue:join(State#state.docs, queue:from_list(DocsToAdd)),
- count = State#state.count + length(DocsToAdd)
+handle_add_docs(Seq, DocsToAdd, From, #state{reply_to=nil} = State) ->
+ State1 = update_sequence_lists(Seq, State),
+ NewState = State1#state{
+ docs = queue:join(State1#state.docs, queue:from_list(DocsToAdd)),
+ count = State1#state.count + length(DocsToAdd)
},
if NewState#state.count < ?BUFFER_SIZE ->
{reply, ok, NewState};
true ->
{noreply, NewState#state{reader_from=From}}
end;
-handle_add_docs(DocsToAdd, _From, #state{count=0} = State) ->
- HighSeq = State#state.high_missing_seq,
+handle_add_docs(Seq, DocsToAdd, _From, #state{count=0} = State) ->
+ NewState = update_sequence_lists(Seq, State),
+ HighSeq = calculate_new_high_seq(NewState),
gen_server:reply(State#state.reply_to, {HighSeq, DocsToAdd}),
- {reply, ok, State#state{reply_to=nil}}.
+ {reply, ok, NewState#state{reply_to=nil}}.
handle_next_docs(From, #state{count=0} = State) ->
if State#state.complete ->
- {stop, normal, {complete, State#state.high_missing_seq}, State};
+ {stop, normal, {complete, calculate_new_high_seq(State)}, State};
true ->
{noreply, State#state{reply_to=From}}
end;
handle_next_docs(_From, State) ->
#state{
reader_from = ReaderFrom,
- docs = Docs,
- high_missing_seq = HighSeq
+ docs = Docs
} = State,
if ReaderFrom =/= nil ->
gen_server:reply(ReaderFrom, ok);
true -> ok end,
NewState = State#state{count=0, reader_from=nil, docs=queue:new()},
- {reply, {HighSeq, queue:to_list(Docs)}, NewState}.
+ ?LOG_INFO("replying to next_docs with HighSeq ~p", [calculate_new_high_seq(State)]),
+ {reply, {calculate_new_high_seq(State), queue:to_list(Docs)}, NewState}.
-handle_open_doc_revs(Id, Revs, Seq, From, #state{monitor_count=N} = State)
+handle_open_remote_doc(Id, Seq, Revs, From, #state{monitor_count=N} = State)
when N > ?MAX_CONCURRENT_REQUESTS ->
- {noreply, State#state{pending_doc_request={From,Id,Revs,Seq}}};
-handle_open_doc_revs(Id, Revs, Seq, _From, #state{source=#http_db{}} = State) ->
+ {noreply, State#state{pending_doc_request={From,Id,Seq,Revs}}};
+handle_open_remote_doc(Id, Seq, Revs, _, #state{source=#http_db{}} = State) ->
#state{
monitor_count = Count,
- monitors_by_ref = MonitorsByRef,
source = Source
} = State,
- {_, Ref} = spawn_document_request(Source, Id, Revs),
- ets:insert(MonitorsByRef, {Ref, Seq}),
+ {_, _Ref} = spawn_document_request(Source, Id, Seq, Revs),
{reply, ok, State#state{monitor_count = Count+1}}.
-handle_monitor_down(normal, Ref, #state{pending_doc_request=nil,
+handle_monitor_down(normal, #state{pending_doc_request=nil,
monitor_count=1, complete=waiting_on_monitors} = State) ->
- N = calculate_new_high_seq(State, Ref),
- {noreply, State#state{complete=true, monitor_count=0, high_missing_seq=N}};
-handle_monitor_down(normal, Ref, #state{pending_doc_request=nil} = State) ->
+ {noreply, State#state{complete=true, monitor_count=0}};
+handle_monitor_down(normal, #state{pending_doc_request=nil} = State) ->
#state{monitor_count = Count} = State,
- HighSeq = calculate_new_high_seq(State, Ref),
- {noreply, State#state{monitor_count = Count-1, high_missing_seq=HighSeq}};
-handle_monitor_down(normal, Ref, State) ->
+ {noreply, State#state{monitor_count = Count-1}};
+handle_monitor_down(normal, State) ->
#state{
source = Source,
- monitors_by_ref = MonitorsByRef,
- pending_doc_request = {From, Id, Revs, Seq}
+ pending_doc_request = {From, Id, Seq, Revs}
} = State,
- HighSeq = calculate_new_high_seq(State, Ref),
gen_server:reply(From, ok),
- {_, NewRef} = spawn_document_request(Source, Id, Revs),
- ets:insert(MonitorsByRef, {NewRef, Seq}),
- {noreply, State#state{pending_doc_request=nil, high_missing_seq=HighSeq}};
-handle_monitor_down(Reason, _, State) ->
+ {_, _NewRef} = spawn_document_request(Source, Id, Seq, Revs),
+ {noreply, State#state{pending_doc_request=nil}};
+handle_monitor_down(Reason, State) ->
{stop, Reason, State}.
handle_reader_loop_complete(#state{reply_to=nil, monitor_count=0} = State) ->
{noreply, State#state{complete = true}};
handle_reader_loop_complete(#state{monitor_count=0} = State) ->
- HighSeq = State#state.high_missing_seq,
+ HighSeq = calculate_new_high_seq(State),
gen_server:reply(State#state.reply_to, {complete, HighSeq}),
{stop, normal, State};
handle_reader_loop_complete(State) ->
{noreply, State#state{complete = waiting_on_monitors}}.
+calculate_new_high_seq(#state{requested_seqs=[], opened_seqs=[Open|_]}) ->
+ Open;
+calculate_new_high_seq(#state{requested_seqs=[Req|_], opened_seqs=[Open|_]})
+ when Req < Open ->
+ 0;
+calculate_new_high_seq(State) ->
+ hd(State#state.opened_seqs).
+
split_revlist(Rev, {[CurrentAcc|Rest], BaseLength, Length}) ->
case Length+size(Rev) > 8192 of
false ->
@@ -187,6 +186,31 @@ split_revlist(Rev, {[CurrentAcc|Rest], BaseLength, Length}) ->
{[[Rev],CurrentAcc|Rest], BaseLength, BaseLength}
end.
+% We store outstanding requested sequences and a subset of already opened
+% sequences in 2 ordered lists. The subset of opened seqs is a) the largest
+% opened seq smaller than the smallest outstanding request seq plus b) all the
+% opened seqs greater than the smallest outstanding request. I believe its the
+% minimal set of info needed to correctly calculate which seqs have been
+% replicated (because remote docs can be opened out-of-order) -- APK
+update_sequence_lists(Seq, State) ->
+ Requested = lists:delete(Seq, State#state.requested_seqs),
+ AllOpened = lists:merge([Seq], State#state.opened_seqs),
+ Opened = case Requested of
+ [] ->
+ [lists:last(AllOpened)];
+ [EarliestReq|_] ->
+ case lists:splitwith(fun(X) -> X < EarliestReq end, AllOpened) of
+ {[], Greater} ->
+ Greater;
+ {Less, Greater} ->
+ [lists:last(Less) | Greater]
+ end
+ end,
+ State#state{
+ requested_seqs = Requested,
+ opened_seqs = Opened
+ }.
+
open_doc_revs(#http_db{} = DbS, DocId, Revs) ->
%% all this logic just splits up revision lists that are too long for
%% MochiWeb into multiple requests
@@ -214,25 +238,24 @@ open_doc_revs(#http_db{} = DbS, DocId, Revs) ->
reader_loop(ReaderServer, Source, MissingRevsServer) ->
case couch_rep_missing_revs:next(MissingRevsServer) of
complete ->
- % ?LOG_INFO("reader_loop terminating with complete", []),
exit(complete);
{HighSeq, IdsRevs} ->
- % ?LOG_DEBUG("got IdsRevs ~p", [IdsRevs]),
+ % to be safe, make sure Results are sorted by source_seq
+ SortedIdsRevs = lists:keysort(2, IdsRevs),
+ RequestSeqs = [S || {_,S,_} <- SortedIdsRevs],
+ gen_server:call(ReaderServer, {add_request_seqs, RequestSeqs}),
case Source of
#http_db{} ->
- N = length(IdsRevs),
- gen_server:call(ReaderServer, {set_monitor_count, HighSeq, N}),
- [gen_server:call(ReaderServer, {open_doc_revs, Id, Revs, HighSeq})
- || {Id,Revs} <- IdsRevs],
+ [gen_server:call(ReaderServer, {open_remote_doc, Id, Seq, Revs})
+ || {Id,Seq,Revs} <- SortedIdsRevs],
reader_loop(ReaderServer, Source, MissingRevsServer);
_Local ->
Source2 = maybe_reopen_db(Source, HighSeq),
- lists:foreach(fun({Id,Revs}) ->
+ lists:foreach(fun({Id,Seq,Revs}) ->
{ok, Docs} = couch_db:open_doc_revs(Source2, Id, Revs, [latest]),
JustTheDocs = [Doc || {ok, Doc} <- Docs],
- gen_server:call(ReaderServer, {add_docs, JustTheDocs})
- end, IdsRevs),
- gen_server:call(ReaderServer, {update_high_seq, HighSeq}),
+ gen_server:call(ReaderServer, {add_docs, Seq, JustTheDocs})
+ end, SortedIdsRevs),
reader_loop(ReaderServer, Source2, MissingRevsServer)
end
end.
@@ -243,34 +266,10 @@ maybe_reopen_db(#db{update_seq=OldSeq} = Db, HighSeq) when HighSeq > OldSeq ->
maybe_reopen_db(Db, _HighSeq) ->
Db.
-spawn_document_request(Source, Id, Revs) ->
+spawn_document_request(Source, Id, Seq, Revs) ->
Server = self(),
SpawnFun = fun() ->
Results = open_doc_revs(Source, Id, Revs),
- gen_server:call(Server, {add_docs, Results})
+ gen_server:call(Server, {add_docs, Seq, Results})
end,
spawn_monitor(SpawnFun).
-
-%% check if any more HTTP requests are pending for this update sequence
-calculate_new_high_seq(State, Ref) ->
- #state{
- monitors_by_ref = MonitorsByRef,
- monitor_count_by_seq = MonitorCountBySeq,
- high_missing_seq = OldSeq
- } = State,
- Seq = ets:lookup_element(MonitorsByRef, Ref, 2),
- ets:delete(MonitorsByRef, Ref),
- case ets:update_counter(MonitorCountBySeq, Seq, -1) of
- 0 ->
- ets:delete(MonitorCountBySeq, Seq),
- case ets:first(MonitorCountBySeq) of
- Key when Key > Seq ->
- Seq;
- '$end_of_table' ->
- Seq;
- _Else ->
- OldSeq
- end;
- _Else ->
- OldSeq
- end.