diff options
-rw-r--r-- | src/couchdb/couch_rep_missing_revs.erl | 12 | ||||
-rw-r--r-- | src/couchdb/couch_rep_reader.erl | 167 |
2 files changed, 93 insertions, 86 deletions
diff --git a/src/couchdb/couch_rep_missing_revs.erl b/src/couchdb/couch_rep_missing_revs.erl index 7e1dc16a..59ab30ec 100644 --- a/src/couchdb/couch_rep_missing_revs.erl +++ b/src/couchdb/couch_rep_missing_revs.erl @@ -148,6 +148,7 @@ get_missing_revs(#http_db{}=Target, Changes) -> Transform = fun({[{<<"seq">>,_}, {<<"id">>,Id}, {<<"changes">>,C}]}) -> {Id, [couch_doc:rev_to_str(R) || {[{<<"rev">>, R}]} <- C]} end, IdRevsList = [Transform(Change) || Change <- Changes], + SeqDict = changes_dictionary(Changes), {[{<<"seq">>, HighSeq}, _, _]} = lists:last(Changes), Request = Target#http_db{ resource = "_missing_revs", @@ -156,16 +157,23 @@ get_missing_revs(#http_db{}=Target, Changes) -> }, {Resp} = couch_rep_httpc:request(Request), {MissingRevs} = proplists:get_value(<<"missing_revs">>, Resp), - X = [{Id, couch_doc:parse_revs(RevStrs)} || {Id,RevStrs} <- MissingRevs], + X = [{Id, dict:fetch(Id, SeqDict), couch_doc:parse_revs(RevStrs)} || + {Id,RevStrs} <- MissingRevs], {HighSeq, X}; get_missing_revs(Target, Changes) -> Transform = fun({[{<<"seq">>,_}, {<<"id">>,Id}, {<<"changes">>,C}]}) -> {Id, [R || {[{<<"rev">>, R}]} <- C]} end, IdRevsList = [Transform(Change) || Change <- Changes], + SeqDict = changes_dictionary(Changes), {[{<<"seq">>, HighSeq}, _, _]} = lists:last(Changes), {ok, Results} = couch_db:get_missing_revs(Target, IdRevsList), - {HighSeq, Results}. + {HighSeq, [{Id, dict:fetch(Id, SeqDict), Revs} || {Id, Revs} <- Results]}. + +changes_dictionary(ChangeList) -> + KVs = [{proplists:get_value(<<"id">>,C), proplists:get_value(<<"seq">>,C)} + || {C} <- ChangeList], + dict:from_list(KVs). %% save a checkpoint if no revs are missing on target so we don't %% rescan metadata unnecessarily diff --git a/src/couchdb/couch_rep_reader.erl b/src/couchdb/couch_rep_reader.erl index a5a1fecd..4feb3804 100644 --- a/src/couchdb/couch_rep_reader.erl +++ b/src/couchdb/couch_rep_reader.erl @@ -38,10 +38,9 @@ reply_to = nil, complete = false, monitor_count = 0, - monitor_count_by_seq = ets:new(monitor_count_by_seq, [set, private]), - monitors_by_ref = ets:new(monitors_by_ref, [set, private]), pending_doc_request = nil, - high_missing_seq = 0 + requested_seqs = [], + opened_seqs = [] }). start_link(Parent, Source, MissingRevs, PostProps) -> @@ -67,28 +66,25 @@ init([Parent, Source, MissingRevs, _PostProps]) -> }, {ok, State}. -handle_call({add_docs, Docs}, From, State) -> +handle_call({add_docs, Seq, Docs}, From, State) -> State#state.parent ! {update_stats, docs_read, length(Docs)}, - handle_add_docs(lists:flatten(Docs), From, State); + handle_add_docs(Seq, lists:flatten(Docs), From, State); + +handle_call({add_request_seqs, Seqs}, _From, State) -> + SeqList = State#state.requested_seqs, + {reply, ok, State#state{requested_seqs = lists:merge(Seqs, SeqList)}}; handle_call(next_docs, From, State) -> handle_next_docs(From, State); -handle_call({open_doc_revs, Id, Revs, HighSeq}, From, State) -> - handle_open_doc_revs(Id, Revs, HighSeq, From, State); - -handle_call({set_monitor_count, Seq, Count}, _From, State) -> - ets:insert(State#state.monitor_count_by_seq, {Seq,Count}), - {reply, ok, State}; - -handle_call({update_high_seq, HighSeq}, _From, State) -> - {reply, ok, State#state{high_missing_seq=HighSeq}}. +handle_call({open_remote_doc, Id, Seq, Revs}, From, State) -> + handle_open_remote_doc(Id, Seq, Revs, From, State). handle_cast(_Msg, State) -> {noreply, State}. -handle_info({'DOWN', Ref, _, _, Reason}, State) -> - handle_monitor_down(Reason, Ref, State); +handle_info({'DOWN', _, _, _, Reason}, State) -> + handle_monitor_down(Reason, State); handle_info({'EXIT', Loop, complete}, #state{reader_loop=Loop} = State) -> handle_reader_loop_complete(State). @@ -102,83 +98,86 @@ code_change(_OldVsn, State, _Extra) -> %internal funs -handle_add_docs(DocsToAdd, From, #state{reply_to=nil} = State) -> - NewState = State#state{ - docs = queue:join(State#state.docs, queue:from_list(DocsToAdd)), - count = State#state.count + length(DocsToAdd) +handle_add_docs(Seq, DocsToAdd, From, #state{reply_to=nil} = State) -> + State1 = update_sequence_lists(Seq, State), + NewState = State1#state{ + docs = queue:join(State1#state.docs, queue:from_list(DocsToAdd)), + count = State1#state.count + length(DocsToAdd) }, if NewState#state.count < ?BUFFER_SIZE -> {reply, ok, NewState}; true -> {noreply, NewState#state{reader_from=From}} end; -handle_add_docs(DocsToAdd, _From, #state{count=0} = State) -> - HighSeq = State#state.high_missing_seq, +handle_add_docs(Seq, DocsToAdd, _From, #state{count=0} = State) -> + NewState = update_sequence_lists(Seq, State), + HighSeq = calculate_new_high_seq(NewState), gen_server:reply(State#state.reply_to, {HighSeq, DocsToAdd}), - {reply, ok, State#state{reply_to=nil}}. + {reply, ok, NewState#state{reply_to=nil}}. handle_next_docs(From, #state{count=0} = State) -> if State#state.complete -> - {stop, normal, {complete, State#state.high_missing_seq}, State}; + {stop, normal, {complete, calculate_new_high_seq(State)}, State}; true -> {noreply, State#state{reply_to=From}} end; handle_next_docs(_From, State) -> #state{ reader_from = ReaderFrom, - docs = Docs, - high_missing_seq = HighSeq + docs = Docs } = State, if ReaderFrom =/= nil -> gen_server:reply(ReaderFrom, ok); true -> ok end, NewState = State#state{count=0, reader_from=nil, docs=queue:new()}, - {reply, {HighSeq, queue:to_list(Docs)}, NewState}. + ?LOG_INFO("replying to next_docs with HighSeq ~p", [calculate_new_high_seq(State)]), + {reply, {calculate_new_high_seq(State), queue:to_list(Docs)}, NewState}. -handle_open_doc_revs(Id, Revs, Seq, From, #state{monitor_count=N} = State) +handle_open_remote_doc(Id, Seq, Revs, From, #state{monitor_count=N} = State) when N > ?MAX_CONCURRENT_REQUESTS -> - {noreply, State#state{pending_doc_request={From,Id,Revs,Seq}}}; -handle_open_doc_revs(Id, Revs, Seq, _From, #state{source=#http_db{}} = State) -> + {noreply, State#state{pending_doc_request={From,Id,Seq,Revs}}}; +handle_open_remote_doc(Id, Seq, Revs, _, #state{source=#http_db{}} = State) -> #state{ monitor_count = Count, - monitors_by_ref = MonitorsByRef, source = Source } = State, - {_, Ref} = spawn_document_request(Source, Id, Revs), - ets:insert(MonitorsByRef, {Ref, Seq}), + {_, _Ref} = spawn_document_request(Source, Id, Seq, Revs), {reply, ok, State#state{monitor_count = Count+1}}. -handle_monitor_down(normal, Ref, #state{pending_doc_request=nil, +handle_monitor_down(normal, #state{pending_doc_request=nil, monitor_count=1, complete=waiting_on_monitors} = State) -> - N = calculate_new_high_seq(State, Ref), - {noreply, State#state{complete=true, monitor_count=0, high_missing_seq=N}}; -handle_monitor_down(normal, Ref, #state{pending_doc_request=nil} = State) -> + {noreply, State#state{complete=true, monitor_count=0}}; +handle_monitor_down(normal, #state{pending_doc_request=nil} = State) -> #state{monitor_count = Count} = State, - HighSeq = calculate_new_high_seq(State, Ref), - {noreply, State#state{monitor_count = Count-1, high_missing_seq=HighSeq}}; -handle_monitor_down(normal, Ref, State) -> + {noreply, State#state{monitor_count = Count-1}}; +handle_monitor_down(normal, State) -> #state{ source = Source, - monitors_by_ref = MonitorsByRef, - pending_doc_request = {From, Id, Revs, Seq} + pending_doc_request = {From, Id, Seq, Revs} } = State, - HighSeq = calculate_new_high_seq(State, Ref), gen_server:reply(From, ok), - {_, NewRef} = spawn_document_request(Source, Id, Revs), - ets:insert(MonitorsByRef, {NewRef, Seq}), - {noreply, State#state{pending_doc_request=nil, high_missing_seq=HighSeq}}; -handle_monitor_down(Reason, _, State) -> + {_, _NewRef} = spawn_document_request(Source, Id, Seq, Revs), + {noreply, State#state{pending_doc_request=nil}}; +handle_monitor_down(Reason, State) -> {stop, Reason, State}. handle_reader_loop_complete(#state{reply_to=nil, monitor_count=0} = State) -> {noreply, State#state{complete = true}}; handle_reader_loop_complete(#state{monitor_count=0} = State) -> - HighSeq = State#state.high_missing_seq, + HighSeq = calculate_new_high_seq(State), gen_server:reply(State#state.reply_to, {complete, HighSeq}), {stop, normal, State}; handle_reader_loop_complete(State) -> {noreply, State#state{complete = waiting_on_monitors}}. +calculate_new_high_seq(#state{requested_seqs=[], opened_seqs=[Open|_]}) -> + Open; +calculate_new_high_seq(#state{requested_seqs=[Req|_], opened_seqs=[Open|_]}) + when Req < Open -> + 0; +calculate_new_high_seq(State) -> + hd(State#state.opened_seqs). + split_revlist(Rev, {[CurrentAcc|Rest], BaseLength, Length}) -> case Length+size(Rev) > 8192 of false -> @@ -187,6 +186,31 @@ split_revlist(Rev, {[CurrentAcc|Rest], BaseLength, Length}) -> {[[Rev],CurrentAcc|Rest], BaseLength, BaseLength} end. +% We store outstanding requested sequences and a subset of already opened +% sequences in 2 ordered lists. The subset of opened seqs is a) the largest +% opened seq smaller than the smallest outstanding request seq plus b) all the +% opened seqs greater than the smallest outstanding request. I believe its the +% minimal set of info needed to correctly calculate which seqs have been +% replicated (because remote docs can be opened out-of-order) -- APK +update_sequence_lists(Seq, State) -> + Requested = lists:delete(Seq, State#state.requested_seqs), + AllOpened = lists:merge([Seq], State#state.opened_seqs), + Opened = case Requested of + [] -> + [lists:last(AllOpened)]; + [EarliestReq|_] -> + case lists:splitwith(fun(X) -> X < EarliestReq end, AllOpened) of + {[], Greater} -> + Greater; + {Less, Greater} -> + [lists:last(Less) | Greater] + end + end, + State#state{ + requested_seqs = Requested, + opened_seqs = Opened + }. + open_doc_revs(#http_db{} = DbS, DocId, Revs) -> %% all this logic just splits up revision lists that are too long for %% MochiWeb into multiple requests @@ -214,25 +238,24 @@ open_doc_revs(#http_db{} = DbS, DocId, Revs) -> reader_loop(ReaderServer, Source, MissingRevsServer) -> case couch_rep_missing_revs:next(MissingRevsServer) of complete -> - % ?LOG_INFO("reader_loop terminating with complete", []), exit(complete); {HighSeq, IdsRevs} -> - % ?LOG_DEBUG("got IdsRevs ~p", [IdsRevs]), + % to be safe, make sure Results are sorted by source_seq + SortedIdsRevs = lists:keysort(2, IdsRevs), + RequestSeqs = [S || {_,S,_} <- SortedIdsRevs], + gen_server:call(ReaderServer, {add_request_seqs, RequestSeqs}), case Source of #http_db{} -> - N = length(IdsRevs), - gen_server:call(ReaderServer, {set_monitor_count, HighSeq, N}), - [gen_server:call(ReaderServer, {open_doc_revs, Id, Revs, HighSeq}) - || {Id,Revs} <- IdsRevs], + [gen_server:call(ReaderServer, {open_remote_doc, Id, Seq, Revs}) + || {Id,Seq,Revs} <- SortedIdsRevs], reader_loop(ReaderServer, Source, MissingRevsServer); _Local -> Source2 = maybe_reopen_db(Source, HighSeq), - lists:foreach(fun({Id,Revs}) -> + lists:foreach(fun({Id,Seq,Revs}) -> {ok, Docs} = couch_db:open_doc_revs(Source2, Id, Revs, [latest]), JustTheDocs = [Doc || {ok, Doc} <- Docs], - gen_server:call(ReaderServer, {add_docs, JustTheDocs}) - end, IdsRevs), - gen_server:call(ReaderServer, {update_high_seq, HighSeq}), + gen_server:call(ReaderServer, {add_docs, Seq, JustTheDocs}) + end, SortedIdsRevs), reader_loop(ReaderServer, Source2, MissingRevsServer) end end. @@ -243,34 +266,10 @@ maybe_reopen_db(#db{update_seq=OldSeq} = Db, HighSeq) when HighSeq > OldSeq -> maybe_reopen_db(Db, _HighSeq) -> Db. -spawn_document_request(Source, Id, Revs) -> +spawn_document_request(Source, Id, Seq, Revs) -> Server = self(), SpawnFun = fun() -> Results = open_doc_revs(Source, Id, Revs), - gen_server:call(Server, {add_docs, Results}) + gen_server:call(Server, {add_docs, Seq, Results}) end, spawn_monitor(SpawnFun). - -%% check if any more HTTP requests are pending for this update sequence -calculate_new_high_seq(State, Ref) -> - #state{ - monitors_by_ref = MonitorsByRef, - monitor_count_by_seq = MonitorCountBySeq, - high_missing_seq = OldSeq - } = State, - Seq = ets:lookup_element(MonitorsByRef, Ref, 2), - ets:delete(MonitorsByRef, Ref), - case ets:update_counter(MonitorCountBySeq, Seq, -1) of - 0 -> - ets:delete(MonitorCountBySeq, Seq), - case ets:first(MonitorCountBySeq) of - Key when Key > Seq -> - Seq; - '$end_of_table' -> - Seq; - _Else -> - OldSeq - end; - _Else -> - OldSeq - end. |