diff options
Diffstat (limited to 'src/couchdb')
-rw-r--r-- | src/couchdb/Makefile.am | 2 | ||||
-rw-r--r-- | src/couchdb/couch_rep_missing_revs.erl | 180 |
2 files changed, 182 insertions, 0 deletions
diff --git a/src/couchdb/Makefile.am b/src/couchdb/Makefile.am index c3806310..f32497fa 100644 --- a/src/couchdb/Makefile.am +++ b/src/couchdb/Makefile.am @@ -76,6 +76,7 @@ source_files = \ couch_rep.erl \ couch_rep_changes_feed.erl \ couch_rep_httpc.erl \ + couch_rep_missing_revs.erl \ couch_rep_sup.erl \ couch_server.erl \ couch_server_sup.erl \ @@ -123,6 +124,7 @@ compiled_files = \ couch_ref_counter.beam \ couch_rep.beam \ couch_rep_changes_feed.beam \ + couch_rep_missing_revs.beam \ couch_rep_httpc.beam \ couch_rep_sup.beam \ couch_server.beam \ diff --git a/src/couchdb/couch_rep_missing_revs.erl b/src/couchdb/couch_rep_missing_revs.erl new file mode 100644 index 00000000..bd7cda66 --- /dev/null +++ b/src/couchdb/couch_rep_missing_revs.erl @@ -0,0 +1,180 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-module(couch_rep_missing_revs). +-behaviour(gen_server). +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, + code_change/3]). + +-export([start_link/4, next/1, stop/1]). + +-define(BUFFER_SIZE, 1000). + +-include("couch_db.hrl"). + +-record (state, { + changes_loop, + changes_from = nil, + target, + parent, + complete = false, + count = 0, + reply_to = nil, + rows = queue:new(), + high_source_seq = 0, + high_missing_seq = 0, + high_committed_seq = 0 +}). + +start_link(Parent, Target, ChangesFeed, PostProps) -> + gen_server:start_link(?MODULE, [Parent, Target, ChangesFeed, PostProps], []). + +next(Server) -> + gen_server:call(Server, next_missing_revs, infinity). + +stop(Server) -> + gen_server:call(Server, stop). + +init([Parent, Target, ChangesFeed, _PostProps]) -> + process_flag(trap_exit, true), + Self = self(), + Pid = spawn_link(fun() -> changes_loop(Self, ChangesFeed, Target) end), + {ok, #state{changes_loop=Pid, target=Target, parent=Parent}}. + +handle_call({add_missing_revs, {HighSeq, Revs}}, From, State) -> + handle_add_missing_revs(HighSeq, Revs, From, State); + +handle_call(next_missing_revs, From, State) -> + handle_next_missing_revs(From, State); + +handle_call({update_committed_seq, N}, _From, State) -> + if State#state.high_committed_seq < N -> + ?LOG_DEBUG("missing_revs updating committed seq to ~p", [N]); + true -> ok end, + {reply, ok, State#state{high_committed_seq=N}}. + +handle_cast(_Msg, State) -> + {noreply, State}. + +handle_info({'EXIT', Pid, Reason}, #state{changes_loop=Pid} = State) -> + handle_changes_loop_exit(Reason, State); + +handle_info(Msg, State) -> + ?LOG_INFO("unexpected message ~p", [Msg]), + {noreply, State}. + +terminate(_Reason, #state{changes_loop=Pid}) when is_pid(Pid) -> + exit(Pid, shutdown), + ok; +terminate(_Reason, _State) -> + ok. + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +%internal funs + +handle_add_missing_revs(HighSeq, [], _From, State) -> + maybe_checkpoint(State), + {reply, ok, State#state{high_source_seq=HighSeq}}; +handle_add_missing_revs(HighSeq, Revs, From, #state{reply_to=nil} = State) -> + #state{rows=Rows, count=Count} = State, + NewState = State#state{ + rows = queue:join(Rows, queue:from_list(Revs)), + count = Count + length(Revs), + high_source_seq = HighSeq, + high_missing_seq = HighSeq + }, + if NewState#state.count < ?BUFFER_SIZE -> + {reply, ok, NewState}; + true -> + {noreply, NewState#state{changes_from=From}} + end; +handle_add_missing_revs(HighSeq, Revs, _From, #state{count=0} = State) -> + gen_server:reply(State#state.reply_to, {HighSeq, Revs}), + NewState = State#state{ + high_source_seq = HighSeq, + high_missing_seq = HighSeq, + reply_to = nil + }, + {reply, ok, NewState}. + +handle_next_missing_revs(From, #state{count=0} = State) -> + if State#state.complete -> + {stop, normal, complete, State}; + true -> + {noreply, State#state{reply_to=From}} + end; +handle_next_missing_revs(_From, State) -> + #state{ + changes_from = ChangesFrom, + high_missing_seq = HighSeq, + rows = Rows + } = State, + if ChangesFrom =/= nil -> gen_server:reply(ChangesFrom, ok); true -> ok end, + NewState = State#state{count=0, changes_from=nil, rows=queue:new()}, + {reply, {HighSeq, queue:to_list(Rows)}, NewState}. + +handle_changes_loop_exit(normal, State) -> + if State#state.reply_to =/= nil -> + gen_server:reply(State#state.reply_to, complete), + {stop, normal, State}; + true -> + {noreply, State#state{complete=true, changes_loop=nil}} + end; +handle_changes_loop_exit(Reason, State) -> + ?LOG_ERROR("changes_loop died with reason ~p", [Reason]), + {stop, changes_loop_died, State#state{changes_loop=nil}}. + +changes_loop(OurServer, SourceChangesServer, Target) -> + case couch_rep_changes_feed:next(SourceChangesServer) of + complete -> + exit(normal); + Changes -> + MissingRevs = get_missing_revs(Target, Changes), + gen_server:call(OurServer, {add_missing_revs, MissingRevs}, infinity) + end, + changes_loop(OurServer, SourceChangesServer, Target). + +get_missing_revs(#http_db{}=Target, Changes) -> + Transform = fun({[{<<"seq">>,_}, {<<"id">>,Id}, {<<"changes">>,C}]}) -> + {Id, [couch_doc:rev_to_str(R) || {[{<<"rev">>, R}]} <- C]} end, + IdRevsList = [Transform(Change) || Change <- Changes], + {[{<<"seq">>, HighSeq}, _, _]} = lists:last(Changes), + Request = Target#http_db{ + resource = "_missing_revs", + method = post, + body = {IdRevsList} + }, + {Resp} = couch_rep_httpc:request(Request), + {MissingRevs} = proplists:get_value(<<"missing_revs">>, Resp), + X = [{Id, couch_doc:parse_revs(RevStrs)} || {Id,RevStrs} <- MissingRevs], + {HighSeq, X}; + +get_missing_revs(Target, Changes) -> + Transform = fun({[{<<"seq">>,_}, {<<"id">>,Id}, {<<"changes">>,C}]}) -> + {Id, [R || {[{<<"rev">>, R}]} <- C]} end, + IdRevsList = [Transform(Change) || Change <- Changes], + {[{<<"seq">>, HighSeq}, _, _]} = lists:last(Changes), + {ok, Results} = couch_db:get_missing_revs(Target, IdRevsList), + {HighSeq, Results}. + +%% save a checkpoint if no revs are missing on target so we don't +%% rescan metadata unnecessarily +maybe_checkpoint(#state{high_missing_seq=N, high_committed_seq=N} = State) -> + #state{ + parent = Parent, + high_source_seq = SourceSeq + } = State, + Parent ! {missing_revs_checkpoint, SourceSeq}; +maybe_checkpoint(_State) -> + ok. |