From 81bdbed444df2cbcf3cdb32f7d4a74019de06454 Mon Sep 17 00:00:00 2001 From: Adam Kocoloski Date: Wed, 11 Aug 2010 15:22:33 -0400 Subject: reorganize couch .erl and driver code into rebar layout --- apps/couch/src/couch_rep_missing_revs.erl | 198 ++++++++++++++++++++++++++++++ 1 file changed, 198 insertions(+) create mode 100644 apps/couch/src/couch_rep_missing_revs.erl (limited to 'apps/couch/src/couch_rep_missing_revs.erl') diff --git a/apps/couch/src/couch_rep_missing_revs.erl b/apps/couch/src/couch_rep_missing_revs.erl new file mode 100644 index 00000000..1eff6774 --- /dev/null +++ b/apps/couch/src/couch_rep_missing_revs.erl @@ -0,0 +1,198 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-module(couch_rep_missing_revs). +-behaviour(gen_server). +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, + code_change/3]). + +-export([start_link/4, next/1, stop/1]). + +-define(BUFFER_SIZE, 1000). + +-include("couch_db.hrl"). + +-record (state, { + changes_loop, + changes_from = nil, + target, + parent, + complete = false, + count = 0, + reply_to = nil, + rows = queue:new(), + high_source_seq = 0, + high_missing_seq = 0, + high_committed_seq = 0 +}). + +start_link(Parent, Target, ChangesFeed, PostProps) -> + gen_server:start_link(?MODULE, [Parent, Target, ChangesFeed, PostProps], []). + +next(Server) -> + gen_server:call(Server, next_missing_revs, infinity). + +stop(Server) -> + gen_server:call(Server, stop). + +init([Parent, Target, ChangesFeed, _PostProps]) -> + process_flag(trap_exit, true), + Self = self(), + Pid = spawn_link(fun() -> changes_loop(Self, ChangesFeed, Target) end), + {ok, #state{changes_loop=Pid, target=Target, parent=Parent}}. + +handle_call({add_missing_revs, {HighSeq, Revs}}, From, State) -> + State#state.parent ! {update_stats, missing_revs, length(Revs)}, + handle_add_missing_revs(HighSeq, Revs, From, State); + +handle_call(next_missing_revs, From, State) -> + handle_next_missing_revs(From, State). + +handle_cast({update_committed_seq, N}, State) -> + if State#state.high_committed_seq < N -> + ?LOG_DEBUG("missing_revs updating committed seq to ~p", [N]); + true -> ok end, + {noreply, State#state{high_committed_seq=N}}. + +handle_info({'EXIT', Pid, Reason}, #state{changes_loop=Pid} = State) -> + handle_changes_loop_exit(Reason, State); + +handle_info(Msg, State) -> + ?LOG_INFO("unexpected message ~p", [Msg]), + {noreply, State}. + +terminate(_Reason, #state{changes_loop=Pid}) when is_pid(Pid) -> + exit(Pid, shutdown), + ok; +terminate(_Reason, _State) -> + ok. + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +%internal funs + +handle_add_missing_revs(HighSeq, [], _From, State) -> + NewState = State#state{high_source_seq=HighSeq}, + maybe_checkpoint(NewState), + {reply, ok, NewState}; +handle_add_missing_revs(HighSeq, Revs, From, #state{reply_to=nil} = State) -> + #state{rows=Rows, count=Count} = State, + NewState = State#state{ + rows = queue:join(Rows, queue:from_list(Revs)), + count = Count + length(Revs), + high_source_seq = HighSeq, + high_missing_seq = HighSeq + }, + if NewState#state.count < ?BUFFER_SIZE -> + {reply, ok, NewState}; + true -> + {noreply, NewState#state{changes_from=From}} + end; +handle_add_missing_revs(HighSeq, Revs, _From, #state{count=0} = State) -> + gen_server:reply(State#state.reply_to, {HighSeq, Revs}), + NewState = State#state{ + high_source_seq = HighSeq, + high_missing_seq = HighSeq, + reply_to = nil + }, + {reply, ok, NewState}. + +handle_next_missing_revs(From, #state{count=0} = State) -> + if State#state.complete -> + {stop, normal, complete, State}; + true -> + {noreply, State#state{reply_to=From}} + end; +handle_next_missing_revs(_From, State) -> + #state{ + changes_from = ChangesFrom, + high_missing_seq = HighSeq, + rows = Rows + } = State, + if ChangesFrom =/= nil -> gen_server:reply(ChangesFrom, ok); true -> ok end, + NewState = State#state{count=0, changes_from=nil, rows=queue:new()}, + {reply, {HighSeq, queue:to_list(Rows)}, NewState}. + +handle_changes_loop_exit(normal, State) -> + if State#state.reply_to =/= nil -> + gen_server:reply(State#state.reply_to, complete), + {stop, normal, State}; + true -> + {noreply, State#state{complete=true, changes_loop=nil}} + end; +handle_changes_loop_exit(Reason, State) -> + {stop, Reason, State#state{changes_loop=nil}}. + +changes_loop(OurServer, SourceChangesServer, Target) -> + case couch_rep_changes_feed:next(SourceChangesServer) of + complete -> + exit(normal); + Changes -> + MissingRevs = get_missing_revs(Target, Changes), + gen_server:call(OurServer, {add_missing_revs, MissingRevs}, infinity) + end, + changes_loop(OurServer, SourceChangesServer, Target). + +get_missing_revs(#http_db{}=Target, Changes) -> + Transform = fun({Props}) -> + C = couch_util:get_value(<<"changes">>, Props), + Id = couch_util:get_value(<<"id">>, Props), + {Id, [R || {[{<<"rev">>, R}]} <- C]} + end, + IdRevsList = [Transform(Change) || Change <- Changes], + SeqDict = changes_dictionary(Changes), + {LastProps} = lists:last(Changes), + HighSeq = couch_util:get_value(<<"seq">>, LastProps), + Request = Target#http_db{ + resource = "_missing_revs", + method = post, + body = {IdRevsList} + }, + {Resp} = couch_rep_httpc:request(Request), + case couch_util:get_value(<<"missing_revs">>, Resp) of + {MissingRevs} -> + X = [{Id, dict:fetch(Id, SeqDict), couch_doc:parse_revs(RevStrs)} || + {Id,RevStrs} <- MissingRevs], + {HighSeq, X}; + _ -> + exit({target_error, couch_util:get_value(<<"error">>, Resp)}) + end; + +get_missing_revs(Target, Changes) -> + Transform = fun({Props}) -> + C = couch_util:get_value(<<"changes">>, Props), + Id = couch_util:get_value(<<"id">>, Props), + {Id, [couch_doc:parse_rev(R) || {[{<<"rev">>, R}]} <- C]} + end, + IdRevsList = [Transform(Change) || Change <- Changes], + SeqDict = changes_dictionary(Changes), + {LastProps} = lists:last(Changes), + HighSeq = couch_util:get_value(<<"seq">>, LastProps), + {ok, Results} = couch_db:get_missing_revs(Target, IdRevsList), + {HighSeq, [{Id, dict:fetch(Id, SeqDict), Revs} || {Id, Revs, _} <- Results]}. + +changes_dictionary(ChangeList) -> + KVs = [{couch_util:get_value(<<"id">>,C), couch_util:get_value(<<"seq">>,C)} + || {C} <- ChangeList], + dict:from_list(KVs). + +%% save a checkpoint if no revs are missing on target so we don't +%% rescan metadata unnecessarily +maybe_checkpoint(#state{high_missing_seq=N, high_committed_seq=N} = State) -> + #state{ + parent = Parent, + high_source_seq = SourceSeq + } = State, + Parent ! {missing_revs_checkpoint, SourceSeq}; +maybe_checkpoint(_State) -> + ok. -- cgit v1.2.3